Kafka と ClickHouse を使用した Vector
Using Vector with Kafka and ClickHouse
Vectorは、Kafkaから読み取り、ClickHouseにイベントを送信する能力を持つベンダーに依存しないデータパイプラインです。
ClickHouseと連携したVectorのはじめにガイドは、ログのユースケースとファイルからのイベント読み取りに焦点を当てています。私たちは、Kafkaトピックで保持されているイベントがあるGithubサンプルデータセットを利用しています。
Vectorは、プッシュまたはプルモデルを通じてデータを取得するためにソースを利用します。一方、シンクはイベントの宛先を提供します。したがって、私たちはKafkaソースとClickHouseシンクを利用します。Kafkaはシンクとしてサポートされていますが、ClickHouseソースは利用できないことに注意してください。そのため、ClickHouseからKafkaにデータを転送したいユーザーにはVectorは適していません。
Vectorはまた、データの変換もサポートしています。これについては、このガイドの範囲を超えています。この件に関してはユーザーはVectorのドキュメントを参照することをお勧めします。
現在のClickHouseシンクの実装はHTTPインターフェースを利用していることに注意してください。現在、ClickHouseシンクはJSONスキーマの使用をサポートしていません。データはプレーンJSON形式または文字列としてKafkaに公開されなければなりません。
License
Vectorは、MPL-2.0 Licenseの下で配布されています。
Gather your connection details
To connect to ClickHouse with HTTP(S) you need this information:
-
The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.
-
The DATABASE NAME: out of the box, there is a database named
default
, use the name of the database that you want to connect to. -
The USERNAME and PASSWORD: out of the box, the username is
default
. Use the username appropriate for your use case.
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl
command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
以下は、ClickHouseにHTTP(S)で接続するために必要な情報です:
-
HOSTとPORT: 通常、TLSを使用する場合はポートが8443、使用しない場合は8123です。
-
DATABASE NAME: デフォルトでは、
default
という名前のデータベースがあります。接続したいデータベースの名前を使用します。 -
USERNAMEとPASSWORD: デフォルトでは、ユーザー名は
default
です。使用ケースに適したユーザー名を使用します。
ClickHouse Cloudサービスの詳細は、ClickHouse Cloudコンソールで確認できます。 接続するサービスを選択し、Connectをクリックしてください:

HTTPSを選択すると、詳細はexample curl
コマンドで確認できます。

セルフマネージドのClickHouseを使用している場合は、接続の詳細がClickHouse管理者によって設定されます。
Steps
- Kafkaの
github
トピックを作成し、Githubデータセットを挿入します。
このデータセットは、ClickHouse/ClickHouse
リポジトリに焦点を当てた200,000行で構成されています。
- 対象テーブルが作成されていることを確認します。以下では、デフォルトのデータベースを使用します。
- Vectorをダウンロードしてインストールします。
kafka.toml
構成ファイルを作成し、あなたのKafkaおよびClickHouseインスタンスの値を修正します。
この構成とVectorの動作に関するいくつかの重要な注意事項:
- この例はConfluent Cloudに対してテストされています。したがって、
sasl.*
およびssl.enabled
セキュリティオプションはセルフマネージドの場合には適切でないかもしれません。 - 構成パラメータ
bootstrap_servers
にプロトコルプレフィックスは必要ありません。例:pkc-2396y.us-east-1.aws.confluent.cloud:9092
- ソースパラメータ
decoding.codec = "json"
はメッセージがClickHouseシンクに単一のJSONオブジェクトとして渡されることを保証します。メッセージを文字列として処理し、デフォルトのbytes
値を使用している場合、メッセージの内容はフィールドmessage
に追加されます。ほとんどの場合、これはVectorはじめにガイドに記載されているようにClickHouseで処理が必要になります。 - Vectorはメッセージにいくつかのフィールドを追加します。私たちの例では、構成パラメータ
skip_unknown_fields = true
を通じてClickHouseシンクでこれらのフィールドを無視します。これは、ターゲットテーブルスキーマの一部でないフィールドを無視します。offset
などのメタフィールドが追加されるようにスキーマを調整することを自由に行ってください。 - シンクがイベントのソースをパラメータ
inputs
を介して参照する様子に注意してください。 - ClickHouseシンクの動作についてはこちらを参照してください。最適なスループットのために、ユーザーは
buffer.max_events
、batch.timeout_secs
、およびbatch.max_bytes
パラメータを調整することが望ましいかもしれません。ClickHouseの推奨事項に従って、単一のバッチ内のイベント数(少なくとも1000)は考慮すべき最低値です。均一な高スループットのユースケースでは、ユーザーはパラメータbuffer.max_events
を増加させることがあります。より変動的なスループットには、パラメータbatch.timeout_secs
の変更が必要です。 - パラメータ
auto_offset_reset = "smallest"
はKafkaソースがトピックの開始から開始することを強制し、ステップ(1)で公開されたメッセージを消費することを保証します。ユーザーは異なる動作を必要とすることがあります。こちらを参照して詳細をご確認ください。
- Vectorを開始します。
デフォルトでは、ヘルスチェックがClickHouseへの挿入が始まる前に必要です。これは、接続が確立でき、スキーマが読み取れることを確認します。問題が発生した場合に役立つ追加のログを取得するために、VECTOR_LOG=debug
を前置きしてください。
- データの挿入を確認します。
count |
---|
200000 |