Using Vector with Kafka and ClickHouse
Kafka と ClickHouse を使用した Vector の利用
Vector は、Kafka からデータを読み取り、ClickHouse にイベントを送信する能力を持つ、ベンダーに依存しないデータパイプラインです。
ClickHouse と連携するための はじめに ガイドは、ログのユースケースとファイルからのイベントの読み取りに焦点を当てています。私たちは、Kafka トピックに保持されているイベントを含む Github サンプルデータセット を利用します。
Vector は、プッシュまたはプルモデルを通じてデータを取得するために sources を利用します。一方、sinks はイベントの宛先を提供します。したがって、私は Kafka ソースと ClickHouse シンクを利用します。Kafka はシンクとしてサポートされていますが、ClickHouse ソースは利用できないため、ClickHouse から Kafka にデータを転送したいユーザーには Vector は適していません。
Vector はデータの 変換 もサポートしています。これは本ガイドの範囲を超えています。この機能が必要な場合は、ユーザーは自身のデータセットについて Vector ドキュメントを参照してください。
現在の ClickHouse シンクの実装は HTTP インターフェースを使用しています。現時点では ClickHouse シンクは JSON スキーマの使用をサポートしていません。データはプレーン JSON 形式または文字列として Kafka に公開する必要があります。
ライセンス
Vector は MPL-2.0 ライセンスのもとで配布されています。
接続詳細をまとめる
To connect to ClickHouse with HTTP(S) you need this information:
-
The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.
-
The DATABASE NAME: out of the box, there is a database named
default
, use the name of the database that you want to connect to. -
The USERNAME and PASSWORD: out of the box, the username is
default
. Use the username appropriate for your use case.
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl
command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
ClickHouseにHTTP(S)で接続するには、次の情報が必要です:
-
HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。
-
DATABASE NAME: デフォルトでは、
default
という名前のデータベースがあります。接続したいデータベースの名前を使用してください。 -
USERNAMEとPASSWORD: デフォルトでは、ユーザー名は
default
です。ご利用のケースに適したユーザー名を使用してください。
ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurl
コマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。
手順
- Kafka の
github
トピックを作成し、Github データセット を挿入します。
このデータセットは、ClickHouse/ClickHouse
リポジトリに焦点を当てた 200,000 行から構成されています。
- 目標のテーブルが作成されていることを確認します。以下ではデフォルトのデータベースを使用します。
- Vector をダウンロードしてインストールします。
kafka.toml
構成ファイルを作成し、Kafka と ClickHouse インスタンスの値を変更します。
この構成および Vector の動作に関する重要な注意点がいくつかあります:
- この例は Confluent Cloud に対してテストされています。したがって、
sasl.*
およびssl.enabled
のセキュリティオプションは、セルフマネージドのケースには適さない可能性があります。 - 構成パラメータ
bootstrap_servers
にはプロトコルプレフィックスは必要ありません。例:pkc-2396y.us-east-1.aws.confluent.cloud:9092
- ソースパラメータ
decoding.codec = "json"
は、メッセージが ClickHouse シンクに単一の JSON オブジェクトとして渡されることを保証します。メッセージを文字列として扱い、デフォルトのbytes
値を使用した場合、メッセージの内容はフィールドmessage
に追加されます。このほとんどの場合、Vector のはじめにガイドに記載されたように ClickHouse で処理が必要です。 - Vector はメッセージにいくつかのフィールドを 追加します。私たちの例では、設定パラメータ
skip_unknown_fields = true
を通じて ClickHouse シンクでこれらのフィールドを無視します。これは、ターゲットテーブルスキーマの一部でないフィールドを無視します。これらのメタフィールド(例:offset
)を追加するようにスキーマを調整してください。 - シンクがイベントのソースを
inputs
パラメータを通じて参照していることに注意してください。 - ClickHouse シンクの動作については こちらを参照してください。最適なスループットを得るために、ユーザーは
buffer.max_events
、batch.timeout_secs
、およびbatch.max_bytes
パラメータを調整したいと考えるかもしれません。ClickHouse の 推奨事項 によれば、単一のバッチに含まれるイベント数の最小値として 1000 を考慮すべきです。均一な高スループットユースケースでは、ユーザーはパラメータbuffer.max_events
を増やすことを検討します。可変スループットの場合は、パラメータbatch.timeout_secs
の変更が必要です。 - パラメータ
auto_offset_reset = "smallest"
は Kafka ソースにトピックの最初から始めさせます。これにより、ステップ (1) で公開されたメッセージを消費できるようになります。異なる動作が必要なユーザーは、こちらを参照してください。
- Vector を起動します。
デフォルトでは、ヘルスチェックが ClickHouse への挿入開始前に必要です。これにより、接続が確立され、スキーマが読み取れることが確認されます。問題が発生した場合は、VECTOR_LOG=debug
を前に置くことで、さらに詳細なログを取得できます。
- データの挿入を確認します。
count |
---|
200000 |