Kafkaテーブルエンジンの使用
KafkaテーブルエンジンはClickHouse Cloudではサポートされていません。ClickPipesまたはKafka Connectをご検討ください。
KafkaからClickHouse
Kafkaテーブルエンジンを使用するには、ClickHouseのマテリアライズドビューに広く精通している必要があります。
概要
最初に、最も一般的なユースケース、すなわちKafkaテーブルエンジンを使用してKafkaからClickHouseにデータを挿入することに焦点を当てます。
Kafkaテーブルエンジンは、ClickHouseがKafkaトピックから直接読み取ることを可能にします。このエンジンは、トピックのメッセージを表示するのに役立ちますが、設計上、一度限りの取得しか許可していません。つまり、テーブルに対してクエリが発行されると、キューからデータを消費し、結果を呼び出し元に返す前にコンシューマオフセットを増加させます。実際には、これらのオフセットをリセットしない限り、データを再読み込みすることはできません。
このテーブルエンジンから読み取ったデータを保持するためには、データをキャプチャして別のテーブルに挿入する手段が必要です。トリガーベースのマテリアライズドビューは、この機能をネイティブに提供します。マテリアライズドビューは、テーブルエンジンでの読み取りを開始し、一連のドキュメントを受け取ります。TO句はデータの宛先を決定します - 通常はMerge Treeファミリーのテーブルです。このプロセスは以下に視覚化されています。

手順
1. 準備
ターゲットトピックにデータが格納されている場合は、次の内容をデータセットで使用するように適応できます。あるいは、サンプルのGithubデータセットがこちらで提供されています。このデータセットは、以下の例で使用され、簡略化のためにカラムのスキーマや行のサブセット(特に、ClickHouseリポジトリに関連するGithubイベントに制限されています)を使用しています。このデータセットは、ここで入手可能なフルデータセットと比較しても、ほとんどのクエリが正常に機能するためには十分です。
2. ClickHouseを構成する
このステップは、セキュアなKafkaに接続する場合に必要です。これらの設定は、SQLのDDLコマンドを介して渡すことはできず、ClickHouseのconfig.xmlに設定する必要があります。SASLで保護されたインスタンスに接続することを前提としています。これは、Confluent Cloudとやり取りする際の最も簡単な方法です。
上記のスニペットを新しいファイルオにconf.d/ディレクトリに配置するか、既存の設定ファイルにマージしてください。設定可能な項目については、こちらをご覧ください。
このチュートリアルに使用するために、KafkaEngine
というデータベースを作成します:
データベースを作成したら、切り替える必要があります:
3. 宛先テーブルを作成する
宛先テーブルを準備します。以下の例では、簡略化のためにGitHubの縮小されたスキーマを使用しています。MergeTreeテーブルエンジンを使用していますが、この例はMerge Treeファミリーのいかなるメンバーにも容易に適応できます。
4. トピックを作成し、データを格納する
次に、トピックを作成します。これを行うために使用できるいくつかのツールがあります。ローカルマシンまたはDockerコンテナ内でKafkaを実行している場合、RPKが便利です。次のコマンドを実行して、5つのパーティションを持つgithub
というトピックを作成できます。
Confluent CloudでKafkaを実行している場合、Confluent CLIを使用する方が好ましいかもしれません:
次に、このトピックにデータを格納する必要があります。これを行うためにkcatを使用します。認証が無効のローカルKafkaを実行している場合、次のようなコマンドを実行できます:
または、KafkaクラスターがSASLを使用して認証する場合は、次のようにします:
データセットには200,000行が含まれているため、数秒内に取り込まれるはずです。より大きなデータセットを扱いたい場合は、ClickHouse/kafka-samples GitHubリポジトリの大きなデータセットセクションを参照してください。
5. Kafkaテーブルエンジンを作成する
以下の例では、マージツリーテーブルと同じスキーマを持つテーブルエンジンを作成します。これは厳密には必要ではなく、ターゲットテーブルにエイリアスや一時カラムを持たせることができます。ただし、設定は重要です。トピックからJSONを消費するデータ型としてJSONEachRow
を使用することに注意してください。値github
およびclickhouse
は、それぞれトピックとコンシューマグループの名前を表します。トピックは実際には値のリストで構成できます。
エンジン設定とパフォーマンスの調整については、以下で説明します。この時点で、テーブルgithub_queue
に対して単純な選択を行うことで、いくつかの行が読み取られるはずです。この操作によりコンシューマオフセットが進むことに注意してください。これにより、これらの行はリセットなしで再読み込みされません。制限および必要なパラメーターstream_like_engine_allow_direct_select
にも注意してください。
6. マテリアライズドビューを作成する
マテリアライズドビューは、先に作成した2つのテーブルを接続し、Kafkaテーブルエンジンからデータを読み取り、ターゲットマージツリーテーブルに挿入します。いくつかのデータ変換を行うことができます。ここでは簡単な読み取りと挿入を行います。*を使用することで、カラム名が同じであること(大文字と小文字を区別すること)を前提とします。
作成時点で、このマテリアライズドビューはKafkaエンジンに接続し、読み取りを開始し、ターゲットテーブルに行を挿入します。このプロセスは無限に続き、Kafkaに新しいメッセージが挿入されると、それらを消費します。さらにメッセージをKafkaに挿入するために挿入スクリプトを再実行してください。
7. 行が挿入されたことを確認する
ターゲットテーブルにデータが存在することを確認します:
200,000行が表示されるはずです:
共通操作
メッセージ消費の停止と再開
メッセージの消費を停止するには、Kafkaエンジンタブルを切り離すことができます:
これにより、コンシューマグループのオフセットには影響しません。消費を再開し、前のオフセットから続けるには、テーブルを再接続します。
Kafkaメタデータの追加
ClickHouseに取り込まれた後、元のKafkaメッセージからのメタデータを追跡することは有用です。たとえば、特定のトピックやパーティションをどの程度消費したかを知りたい場合があります。この目的のために、Kafkaテーブルエンジンはいくつかの仮想カラムを公開しています。これらは、スキーマおよびマテリアライズドビューのSELECTステートメントを変更することで、ターゲットテーブルのカラムとして永続化できます。
最初に、ターゲットテーブルへのカラム追加の前に上記で説明した停止操作を行います。
以下に、行の元となるトピックとパーティションを識別する情報カラムを追加します。
次に、仮想カラムが必要に応じてマッピングされていることを確認する必要があります。仮想カラムは_
で始まります。仮想カラムの詳細なリストはこちらにあります。
仮想カラムを持つテーブルを更新するには、マテリアライズドビューを削除し、Kafkaエンジンタブルを再接続し、再度マテリアライズドビューを作成する必要があります。
新たに取り込まれた行にはメタデータが含まれるはずです。
結果は以下のようになります。
actor_login | event_type | created_at | topic | partition |
---|---|---|---|---|
IgorMinar | CommitCommentEvent | 2011-02-12 02:22:00 | github | 0 |
queeup | CommitCommentEvent | 2011-02-12 02:23:23 | github | 0 |
IgorMinar | CommitCommentEvent | 2011-02-12 02:23:24 | github | 0 |
IgorMinar | CommitCommentEvent | 2011-02-12 02:24:50 | github | 0 |
IgorMinar | CommitCommentEvent | 2011-02-12 02:25:20 | github | 0 |
dapi | CommitCommentEvent | 2011-02-12 06:18:36 | github | 0 |
sourcerebels | CommitCommentEvent | 2011-02-12 06:34:10 | github | 0 |
jamierumbelow | CommitCommentEvent | 2011-02-12 12:21:40 | github | 0 |
jpn | CommitCommentEvent | 2011-02-12 12:24:31 | github | 0 |
Oxonium | CommitCommentEvent | 2011-02-12 12:31:28 | github | 0 |
Kafkaエンジン設定の変更
Kafkaエンジンタブルを削除して新しい設定で再作成することをお勧めします。このプロセスでは、マテリアライズドビューを変更する必要はありません。Kafkaエンジンタブルが再作成されると、メッセージ消費が再開されます。
問題のデバッグ
認証の問題のようなエラーは、KafkaエンジンDDLへの応答では報告されません。問題を診断するために、主要なClickHouseログファイルclickhouse-server.err.logを使用することをお勧めします。基礎となるKafkaクライアントライブラリlibrdkafkaに対するさらなるトレースログを構成を通じて有効にできます。
不正なメッセージの処理
Kafkaはしばしばデータの「ダンピンググラウンド」として使用されます。これにより、トピックには混合メッセージフォーマットや不整合なフィールド名が含まれることになります。これを避け、Kafka StreamsやksqlDBなどのKafka機能を利用して、メッセージが適切で一貫性のある形式でKafkaに挿入される前に確保してください。これらのオプションが不可能な場合、ClickHouseにはいくつかの機能があります。
- メッセージフィールドを文字列として扱います。必要であれば、マテリアライズドビューステートメントでクレンジングやキャスティングを実行するために関数を使用できます。これは本番環境でのソリューションではありませんが、一時的な取り込みには役立つかもしれません。
- トピックからJSONを消費する場合、JSONEachRowフォーマットを使用し、設定
input_format_skip_unknown_fields
を利用します。データを書き込む際、デフォルトでClickHouseはターゲットテーブルに存在しないカラムが入力データに含まれていると、例外をスローします。ただし、このオプションが有効な場合、これらの余分なカラムは無視されます。これも本番レベルのソリューションではなく、他の人を混乱させる可能性があることに注意してください。 kafka_skip_broken_messages
設定を検討してください。これは、不正なメッセージに対するブロックごとの許容レベルを指定する必要があります。これはkafka_max_block_sizeの文脈で考慮されます。この許容度を超えた場合(絶対的なメッセージで測定される)、通常の例外の振る舞いが元に戻り、他のメッセージがスキップされます。
配信のセマンティクスと重複の課題
Kafkaテーブルエンジンは、少なくとも1回のセマンティクスを持っています。重複する可能性は、いくつかの既知の稀な状況で発生します。たとえば、メッセージがKafkaから読み取られ、ClickHouseに正常に挿入された場合、新しいオフセットをコミットする前にKafkaとの接続が失われる可能性があります。この場合、ブロックを再試行する必要があります。ブロックは、ターゲットテーブルとして分散テーブルまたはReplicatedMergeTreeを使用して重複排除できます。このアプローチは、重複行の可能性を減少させますが、同一のブロックを前提としています。Kafkaのリバランスのようなイベントは、この仮定を無効にし、稀な状況で重複を引き起こす可能性があります。
クオーラムベースの挿入
ClickHouseでより高い配信保証が必要な場合、クオーラムベースの挿入が必要な場合があります。これはマテリアライズドビューやターゲットテーブルに設定できませんが、ユーザープロファイルに対して設定できます。:
ClickHouseからKafka
稀なユースケースですが、ClickHouseのデータもKafkaに保持できます。たとえば、選択的に行をKafkaテーブルエンジンに手動で挿入します。このデータは同じKafkaエンジンによって読み取られ、そのマテリアライズドビューはデータをマージツリーテーブルに配置します。最終的に、既存のソーステーブルからテーブルを読み取るためにKafkaに挿入する際にマテリアライズドビューを適用する方法を示します。
手順
最初の目標は次のように最も良く示されます:

KafkaからClickHouseの手順の下でテーブルとビューを作成済みであり、トピックが完全に消費されていると仮定します。
1. 行を直接挿入する
まず、ターゲットテーブルのカウントを確認します。
200,000行が存在するはずです:
次に、GitHubターゲットテーブルからKafkaテーブルエンジンのgithub_queueに行を挿入します。JSONEachRow形式を利用し、SELECTを100に制限することに注意してください。
再度、GitHubでの行数を確認し、その数が100増えていることを確認してください。上記の図に示すように、行はKafkaテーブルエンジンを介してKafkaに挿入された後、同じエンジンによって再読み取りされ、GitHubターゲットテーブルにマテリアライズドビューによって挿入されます!
100行の追加を表示するはずです:
2. マテリアライズドビューを使用する
テーブルにドキュメントが挿入されると、マテリアライズドビューを利用してKafkaエンジン(およびトピック)にメッセージをプッシュすることができます。GitHubテーブルに行が挿入されると、マテリアライズドビューがトリガーされ、行がKafkaエンジンに再び挿入され、新しいトピックに流れることになります。これも以下のように示されます:

新しいKafkaトピックgithub_out
を作成します。これに対してKafkaテーブルエンジンgithub_out_queue
が指すことを確認します。
次に、新しいマテリアライズドビューgithub_out_mv
をGitHubテーブルにポイントさせ、このエンジンに行を挿入するようにトリガーします。したがって、GitHubテーブルへの追加は、新しいKafkaトピックへプッシュされることになります。
もしKafkaからClickHouseの一部として作成された元のgithubトピックに挿入すれば、ドキュメントは「github_clickhouse」トピックに自動的に表示されます。これを確認するために、ネイティブKafkaツールを利用します。たとえば、下記に示すように、kcatを利用してKafkaトピックに100行を挿入します:
github_out
トピックを読み取ると、メッセージの配信が確認できます。
これは複雑な例ですが、Kafkaエンジンと一緒に使用されるマテリアライズドビューの力を示しています。
クラスターとパフォーマンス
ClickHouseクラスターの使用
Kafkaのコンシューマグループを通じて、複数のClickHouseインスタンスが同じトピックから読み取ることができます。各コンシューマは、トピックパーティションに1:1のマッピングで割り当てられます。Kafkaテーブルエンジンを使用してClickHouseの消費をスケールする際は、クラスター内のコンシューマの総数がトピックのパーティション数を超えることはできないことを考慮してください。従って、トピックのパーティショニングが事前に適切に構成されていることを確認する必要があります。
複数のClickHouseインスタンスは、同じコンシューマグループIDを使用するように構成され、Kafkaテーブルエンジンの作成時に指定されます。そのため、各インスタンスは1つ以上のパーティションから読み取り、ローカルターゲットテーブルにセグメントを挿入します。ターゲットテーブルは、データの重複を処理するためにReplicatedMergeTreeを使用するように構成することもできます。このアプローチは、Kafka読み取りをClickHouseクラスターでスケールさせることが可能ですが、十分なKafkaパーティションが必要です。

パフォーマンスの調整
Kafkaエンジンタブルのスループットパフォーマンスを向上させたい場合は、以下の点を考慮してください。
- パフォーマンスはメッセージのサイズ、形式、ターゲットテーブルの種類によって異なります。単一のテーブルエンジンで秒間100k行の取得が可能であると考えられます。デフォルトでは、メッセージはブロックで読み取られ、パラメータkafka_max_block_sizeによって制御されます。デフォルトでは、これはmax_insert_block_sizeに設定されており、初期値は1,048,576です。メッセージが非常に大きくない限り、これを常に増加させるべきです。500kから1Mの間の値が一般的です。テストしてスループットパフォーマンスへの影響を評価してください。
- テーブルエンジンのコンシューマ数は、kafka_num_consumersを使用して増やすことができます。ただし、デフォルトでは、挿入は単一スレッドで直線化されます。これを防ぐために、kafka_thread_per_consumerのデフォルト値を1から変更してください。1に設定することで、フラッシュを並行して実行できます。複数のコンシューマを持つKafkaエンジンタブルの作成は、マテリアライズドビューとkafka_thread_per_consumer=0を持つN個のKafkaエンジンを作成することと論理的に等しいです。
- コンシューマを増やすことは、無償の操作ではありません。各コンシューマは自身のバッファとスレッドを維持し、サーバーへのオーバーヘッドが増加します。消費者のオーバーヘッドを意識し、各クラスター全体で線形にスケールアップします。
- Kafkaメッセージのスループットが変動する場合、および遅延が許容される場合、stream_flush_interval_msを増加させて、大きなブロックがフラッシュされることを検討してください。
- background_message_broker_schedule_pool_sizeは、バックグラウンドタスクを実行するスレッド数を設定します。これらのスレッドはKafkaストリーミングに使用されます。この設定はClickHouseサーバーの起動時に適用され、ユーザーセッション内で変更することはできず、デフォルトでは16に設定されています。ログにタイムアウトが表示された場合、これを増やすことが適切かもしれません。
- Kafkaとの通信にはlibrdkafkaライブラリが使用され、それ自体がスレッドを作成します。多数のKafkaテーブルやコンシューマが存在すると、文脈の切り替えの数が増大する可能性があります。可能であれば、クラスター全体にこの負荷を分散させ、ターゲットテーブルの複製を行い、または複数のトピックから読み込むためにテーブルエンジンを使用することを検討してください。値のリストがサポートされています。単一のテーブルから複数のマテリアライズドビューを読み取ることができ、それぞれ特定のトピックからのデータをフィルタリングします。
設定の変更はテストするべきです。Kafkaコンシューマの遅れを監視して、適切にスケールされていることを確認してください。
追加設定
上記で検討した設定の他にも、興味を引く可能性がある設定は以下の通りです。
- Kafka_max_wait_ms - メッセージをKafkaから読み取る際の待機時間(ミリ秒)。ユーザープロファイルレベルで設定され、デフォルトは5000です。
基礎となるlibrdkafkaからのすべての設定は、ClickHouseの設定ファイル内の_kafka_要素に配置できます - 設定名はXML要素になり、ピリオドがアンダースコアに置き換えられます。たとえば:
これらは専門的な設定であり、Kafkaのドキュメントを参照して詳細な説明を読むことをお勧めします。