Kafkaテーブルエンジンの使用
Kafkaテーブルエンジンは、Apache Kafkaおよび他のKafka API互換ブローカー(例:Redpanda、Amazon MSK)からデータを読み取るおよびデータを書き込むために使用できます。
KafkaからClickHouse
ClickHouse Cloudをご利用の場合は、代わりにClickPipesの使用をお勧めします。ClickPipesは、プライベートネットワーク接続をネイティブにサポートし、データの取り込みやクラスタリソースを独立してスケーリングし、ClickHouseにストリーミングするKafkaデータの包括的な監視を提供します。
Kafkaテーブルエンジンを使用するには、ClickHouseのマテリアライズドビューについて広く理解している必要があります。
概要
最初に最も一般的なユースケースに焦点を当てます:KafkaからClickHouseにデータを挿入するためにKafkaテーブルエンジンを使用します。
Kafkaテーブルエンジンは、ClickHouseがKafkaトピックから直接読み取ることを可能にします。これはトピック上のメッセージを表示するのに便利ですが、エンジンは設計上、一度きりの取得のみを許可します。つまり、テーブルにクエリを発行すると、キューからデータを消費し、結果を呼び出し元に返す前に消費者オフセットを増加させます。実際には、これらのオフセットをリセットせずに再読されることはありません。
テーブルエンジンからのデータを永続化するためには、データをキャプチャして別のテーブルに挿入する手段が必要です。トリガーに基づくマテリアライズドビューがこの機能をネイティブに提供します。マテリアライズドビューは、テーブルエンジン上で読み取りを開始し、バッチのドキュメントを受信します。TO句はデータの宛先を決定します - 通常はMerge Treeファミリーのテーブルです。このプロセスは以下のように視覚化されます:

手順
1. 準備する
ターゲットトピックにデータが存在する場合は、次の内容をあなたのデータセットで使用するように適応できます。あるいは、サンプルのGitHubデータセットがここで提供されています。このデータセットは以下の例で使用され、簡略化されたスキーマと行のサブセットを使用します(具体的には、ClickHouseリポジトリに関するGitHubイベントに制限しています)。これは、こちらで利用可能な完全なデータセットとの対比において、ほとんどのクエリが機能するためには十分です。
2. ClickHouseを構成する
これは、セキュアなKafkaに接続する場合に必要です。これらの設定はSQL DDLコマンドを通じて渡すことはできず、ClickHouseのconfig.xmlで構成する必要があります。SASLで保護されたインスタンスに接続すると仮定します。これはConfluent Cloudを操作する際に最も簡単な方法です。
上記のスニペットを新しいファイルにconf.d/ディレクトリ内に配置するか、既存の設定ファイルに統合します。設定できる項目についてはこちらを参照してください。
このチュートリアルで使用するために、KafkaEngine
というデータベースも作成します:
データベースを作成したら、そこに切り替える必要があります:
3. 宛先テーブルを作成する
宛先テーブルを準備します。以下の例では、簡潔さのために縮小されたGitHubスキーマを使用しています。MergeTreeテーブルエンジンを使用していますが、この例はMergeTreeファミリーのいずれかのメンバーに容易に適応できます。
4. トピックを作成し、データを投入する
次に、トピックを作成します。これを行うために使用できるツールはいくつかあります。ローカルで機械上またはDockerコンテナ内でKafkaを実行している場合は、RPKがうまく機能します。次のコマンドを実行して、5つのパーティションを持つgithub
というトピックを作成できます:
Confluent CloudでKafkaを実行している場合は、Confluent CLIを使用することを好むかもしれません:
次に、このトピックにいくつかのデータを投入する必要があります。kcatを使用してこれを行います。認証が無効にされているローカルのKafkaを実行している場合は、次のようなコマンドを実行できます:
または、KafkaクラスタがSASLを使用して認証する場合は、次のようにします:
データセットには200,000行含まれているため、数秒で取り込まれるはずです。より大きなデータセットで作業したい場合は、ClickHouse/kafka-samplesの大規模データセットセクションを参照してください。
5. Kafkaテーブルエンジンを作成する
以下の例では、Merge Treeテーブルと同じスキーマを持つテーブルエンジンを作成します。これは厳密には必要ありませんが、ターゲットテーブルにエイリアスまたは一時的なカラムを持つこともできます。ただし、設定は重要です。KafkaトピックからJSONを消費するためのデータ型としてJSONEachRow
の使用に注意してください。値github
とclickhouse
は、それぞれトピック名と消費者グループ名を表します。トピックは実際には値のリストであることができます。
エンジン設定とパフォーマンス調整については以下で説明します。この時点で、テーブルgithub_queue
に対して単純な選択を行うことで、いくつかの行が読み込まれるはずです。この操作は消費者オフセットを前進させるため、これらの行をリセットなしに再読はできません。resetに注意してください。制限および必要なパラメーターstream_like_engine_allow_direct_select
にも注意してください。
6. マテリアライズドビューを作成する
マテリアライズドビューは、以前に作成した2つのテーブルを接続し、Kafkaテーブルエンジンからデータを読み取り、ターゲットのマージツリー表に挿入します。データ変換をいくつか実行できます。単純な読み取りと挿入を行います。* の使用は、カラム名が同じであることを仮定しています(大文字と小文字を区別)。
作成時に、マテリアライズドビューはKafkaエンジンに接続し、読み取りを開始します:ターゲットテーブルに行を挿入します。このプロセスは無限に続き、その後のメッセージの挿入がKafkaから消費され続けます。さらなるメッセージをKafkaに挿入するために挿入スクリプトを再実行しても問題ありません。
7. 行が挿入されたことを確認する
ターゲットテーブルにデータが存在することを確認します:
200,000行が表示されるはずです:
一般的な操作
メッセージ消費の停止と再開
メッセージの消費を停止するには、Kafkaエンジンテーブルをデタッチできます:
これにより、消費者グループのオフセットには影響しません。消費を再開し、前のオフセットから続けるには、テーブルを再接続します。
Kafkaメタデータの追加
ClickHouseに取り込まれた後も、元のKafkaメッセージのメタデータを追跡することは有用です。たとえば、特定のトピックやパーティションをどれだけ消費したかを知りたい場合があります。この目的のために、Kafkaテーブルエンジンは複数の仮想カラムを公開します。これらは、スキーマおよびマテリアライズドビューの選択文を修正することによって、ターゲットテーブルのカラムとして永続化できます。
まず、ターゲットテーブルにカラムを追加する前に、上記の停止操作を実行します。
以下に、行の起源となるソーストピックとパーティションを識別するための情報カラムを追加します。
次に、仮想カラムが必要に応じてマッピングされていることを確認する必要があります。仮想カラムは_
で始まります。仮想カラムの完全なリストはこちらで見つけることができます。
仮想カラムを持つテーブルを更新するには、マテリアライズドビューを削除し、Kafkaエンジンテーブルを再接続し、マテリアライズドビューを再作成する必要があります。
新しく消費された行にはメタデータが追加されるはずです。
結果は次のようになります:
actor_login | event_type | created_at | topic | partition |
---|---|---|---|---|
IgorMinar | CommitCommentEvent | 2011-02-12 02:22:00 | github | 0 |
queeup | CommitCommentEvent | 2011-02-12 02:23:23 | github | 0 |
IgorMinar | CommitCommentEvent | 2011-02-12 02:23:24 | github | 0 |
IgorMinar | CommitCommentEvent | 2011-02-12 02:24:50 | github | 0 |
IgorMinar | CommitCommentEvent | 2011-02-12 02:25:20 | github | 0 |
dapi | CommitCommentEvent | 2011-02-12 06:18:36 | github | 0 |
sourcerebels | CommitCommentEvent | 2011-02-12 06:34:10 | github | 0 |
jamierumbelow | CommitCommentEvent | 2011-02-12 12:21:40 | github | 0 |
jpn | CommitCommentEvent | 2011-02-12 12:24:31 | github | 0 |
Oxonium | CommitCommentEvent | 2011-02-12 12:31:28 | github | 0 |
Kafkaエンジン設定の修正
Kafkaエンジンテーブルを削除し、新しい設定で再作成することをお勧めします。このプロセス中にマテリアライズドビューを修正する必要はありません - Kafkaエンジンテーブルが再作成されると、メッセージの消費が再開されます。
問題のデバッグ
認証問題などのエラーは、KafkaエンジンDDLへの応答に報告されません。問題を診断するためには、メインのClickHouseログファイルclickhouse-server.err.logを使用することをお勧めします。基礎となるKafkaクライアントライブラリlibrdkafka用のさらなるトレースログは、設定を通じて有効にできます。
不正なメッセージの処理
Kafkaはしばしばデータの「ダンプ場」として使用されます。これにより、トピックに混合メッセージ形式や不一致なフィールド名が含まれることがあります。これを避け、Kafka StreamsやksqlDBなどのKafka機能を利用して、Kafkaへの挿入前にメッセージが適切に形成され、一貫していることを確認してください。これらのオプションが不可能な場合、ClickHouseには役立つ機能がいくつかあります。
- メッセージフィールドを文字列として扱います。必要に応じて、マテリアライズドビューのステートメントでクレンジングおよびキャスティングを行うための関数を使用できます。これは本番レベルのソリューションを表すべきではありませんが、単発の取り込みには助けになるかもしれません。
- トピックからJSONを消費する場合、JSONEachRowフォーマットを使用し、設定
input_format_skip_unknown_fields
を使用します。データを書き込む際、デフォルトでは、ClickHouseは入力データにターゲットテーブルに存在しないカラムが含まれていると例外をスローします。ただし、このオプションが有効になっている場合、これらの余分なカラムは無視されます。これもまた、本番レベルのソリューションではなく、他の人を混乱させるかもしれません。 - 設定
kafka_skip_broken_messages
を検討してください。これは、不正なメッセージに対してブロックごとに許容されるレベルを指定する必要があり、kafka_max_block_sizeの文脈で考慮されます。この許容レベルを超えた場合(絶対メッセージ数で測定されます)、通常の例外動作が元に戻り、他のメッセージがスキップされます。
配信セマンティクスと重複の課題
Kafkaテーブルエンジンは少なくとも一度のセマンティクスを持ちます。重複は既知の稀な状況で発生する可能性があります。たとえば、メッセージがKafkaから読み取られ、ClickHouseに正常に挿入される可能性があります。新しいオフセットをコミットする前に、Kafkaへの接続が失われます。この状況ではブロックを再試行する必要があります。このブロックはレプリケーションを使用して、分散テーブルまたはReplicatedMergeTreeをターゲットテーブルとして重複を除去することができます。この方法は重複した行の可能性を減少させますが、同一のブロックに依存します。Kafkaのリバランスなどの事象はこの仮定を無効にし、稀な状況で重複を引き起こす可能性があります。
クォーラムベースの挿入
ClickHouseでより高い配信保証が必要な場合は、クォーラムベースの挿入が必要です。これはマテリアライズドビューやターゲットテーブルで設定することはできません。ただし、ユーザープロファイルに対して設定することは可能です。
ClickHouseからKafka
稀なユースケースですが、ClickHouseデータもKafkaに永続化できます。たとえば、Kafkaテーブルエンジンに手動で行を挿入します。このデータは、同じKafkaエンジンによって読み取られ、マテリアライズドビューがMerge Treeテーブルにデータを配置します。最後に、既存のソーステーブルからテーブルを読み取るための挿入に関連するマテリアライズドビューの適用を示します。
手順
私たちの初期目的は次のように最もよく示されています:

KafkaからClickHouseへの手順の下で、テーブルやビューを作成しており、トピックが完全に消費されたと仮定します。
1. 行を直接挿入する
まず、ターゲットテーブルのカウントを確認します。
200,000行あるはずです:
次に、GitHubターゲットテーブルからKafkaテーブルエンジンgithub_queue
に行を挿入します。JSONEachRowフォーマットを使用し、選択を100に制限していることに注意してください。
GitHubでの行を再カウントし、100増加したことを確認してください。上記のダイアグラムで示されているように、行はKafkaテーブルエンジンを介してKafkaに挿入され、その後同じエンジンによって再読され、GitHubターゲットテーブルにマテリアライズドビューによって挿入されます!
100行の追加を確認する必要があります:
2. マテリアライズドビューを使用する
マテリアライズドビューを利用して、文書がテーブルに挿入されたときにメッセージをKafkaエンジン(およびトピック)にプッシュできます。GitHubテーブルに行が挿入されると、マテリアライズドビューがトリガーされ、その結果、行が新しいトピックへのKafkaエンジンに再び挿入されます。これもまた最もよく示されています。

新しいKafkaトピックgithub_out
またはそれに相当するものを作成します。このトピックを指すKafkaテーブルエンジンgithub_out_queue
を確保します。
次に、github_out_mv
という新しいマテリアライズドビューを作成し、GitHubテーブルを指し、そのトリガー時に上記のエンジンに行を挿入します。GitHubテーブルへの追加は、その結果、新しいKafkaトピックにプッシュされます。
元のgithubトピック(KafkaからClickHouseの一部として作成)に挿入すると、ドキュメントが「github_clickhouse」トピックに魔法のように現れます。これはネイティブのKafkaツールで確認してください。たとえば、以下のように、Confluent Cloudがホストするトピックに100行をgithubトピックに挿入します:
github_out
トピックでの読み込みがメッセージの配信を確認するはずです。
これは複雑な例ですが、Kafkaエンジンと共に使用する場合のマテリアライズドビューの力を示しています。
クラスタとパフォーマンス
ClickHouseクラスタで作業する
Kafkaの消費者グループを通じて、複数のClickHouseインスタンスが同じトピックから消費する可能性があります。それぞれの消費者は、1:1のマッピングでトピックパーティションに割り当てられます。Kafkaテーブルエンジンを使用してClickHouseの消費をスケーリングする際には、クラスタ内の消費者の合計数がトピックのパーティション数を超えないことを考慮してください。そのため、トピックのためにパーティションが適切に設定されていることを事前に確認してください。
複数のClickHouseインスタンスは、同じ消費者グループIDを使用してトピックから読み取るように構成できます - これはKafkaテーブルエンジン作成時に指定されます。したがって、各インスタンスは1つ以上のパーティションから読み取り、自分のローカルターゲットテーブルにセグメントを挿入します。ターゲットテーブルは、データの重複を扱うためにReplicatedMergeTreeを使用するように構成することもできます。このアプローチにより、十分なKafkaパーティションがあればKafkaの読み取りをClickHouseクラスタとともにスケーリングすることができます。

パフォーマンスの調整
Kafka Engineテーブルのスループットパフォーマンスを向上させる際に考慮すべき事項を以下に示します:
- パフォーマンスはメッセージのサイズ、フォーマット、ターゲットテーブルタイプによって異なります。単一のテーブルエンジンで100k行/秒を達成することは obtainable(達成可能)と見なすべきです。デフォルトでは、メッセージはkafka_max_block_sizeパラメータによって制御されるブロックで読み取られます。デフォルトでは、これはdefault_insert_block_sizeに設定され、デフォルトは1,048,576です。メッセージが非常に大きくない限り、通常これは増やすべきです。500kから1Mの間の値は珍しくありません。テストしてスループットパフォーマンスへの影響を評価してください。
- テーブルエンジンの消費者数はkafka_num_consumersを使用して増加させることができます。ただし、デフォルトでは、挿入は単一スレッドで直線化されるため、kafka_thread_per_consumerをデフォルト値の1から変更する必要があります。これを1に設定することで、フラッシュが並行して実行されることを保証します。N消費者でKafkaエンジンテーブルを作成すること(およびkafka_thread_per_consumer=1)は、N個のKafkaエンジンを作成するのと論理的に同等であり、それぞれにマテリアライズドビューとkafka_thread_per_consumer=0があります。
- 消費者を増やすことは自由な操作ではありません。各消費者は独自のバッファとスレッドを維持し、サーバーのオーバーヘッドを増加させます。消費者のオーバーヘッドに注意し、最初にクラスタ全体で線形にスケーリングし、可能であれば。
- Kafkaメッセージのスループットが変動し、遅延が許容できる場合、より大きなブロックがフラッシュされるようにstream_flush_interval_msを増加させることを検討してください。
- background_message_broker_schedule_pool_sizeは、バックグラウンドタスクを実行するスレッドの数を設定します。これらのスレッドはKafkaストリーミングに使用されます。この設定はClickHouseサーバーの起動時に適用され、ユーザーセッション内では変更できず、デフォルトは16です。ログにタイムアウトが見られる場合、これを増加させることが適切かもしれません。
- Kafkaとの通信には、librdkafkaライブラリが使用され、それ自体がスレッドを作成します。大量のKafkaテーブルや消費者が存在する場合、大量のコンテキストスイッチが発生する可能性があります。この負荷をクラスタに分散させるか、可能であればターゲットテーブルを複製することを検討するか、複数のトピックから読み取るテーブルエンジンの使用を検討します - 値のリストがサポートされています。単一のテーブルから複数のマテリアライズドビューを読み取り、それぞれが特定のトピックのデータをフィルタリングできます。
設定の変更はテストする必要があります。適切にスケーラブルであることを確認するために、Kafka消費者のラグを監視することをお勧めします。
追加設定
上記で議論された設定に加えて、以下が関心を持たれる可能性があります:
- Kafka_max_wait_ms - 再試行前にKafkaからメッセージを読み取るための待機時間(ミリ秒単位)。ユーザープロファイルレベルで設定され、デフォルトは5000です。
基礎となるlibrdkafkaからのすべての設定も、ClickHouseの設定ファイル内の_kafka_要素に配置できます - 設定名はXML要素であり、ドットをアンダースコアに置き換えます。
これらは専門的な設定であり、詳細な説明についてはKafkaのドキュメントを参照することをお勧めします。