kafka-vector
KafkaとClickHouseを使用したVector
Vectorは、Kafkaから読み込み、ClickHouseにイベントを送信する能力を持つベンダーに依存しないデータパイプラインです。
はじめにのガイドでは、ClickHouseを使用したVectorについて、ログのユースケースとファイルからのイベントの読み取りに焦点を当てています。私たちは、Kafkaトピックに保管されているイベントを持つGithubのサンプルデータセットを利用します。
Vectorは、プッシュまたはプルモデルを通じてデータを取得するためにソースを利用します。一方、シンクは、イベントの出力先を提供します。したがって、KafkaソースとClickHouseシンクを利用します。Kafkaはシンクとしてサポートされていますが、ClickHouseソースは利用できません。そのため、ClickHouseからKafkaにデータを転送したいユーザーにはVectorは適していません。
また、Vectorはデータの変換をサポートしていますが、これはこのガイドの範囲を超えています。そのため, 必要な場合はVectorドキュメントを参照してください。
現在のClickHouseシンクの実装はHTTPインターフェースを利用しています。現時点では、ClickHouseシンクはJSONスキーマの使用をサポートしていません。データはプレーンJSON形式または文字列としてKafkaに公開する必要があります。
ライセンス
VectorはMPL-2.0ライセンスの下で配布されています。
接続の詳細を集める
ClickHouseにHTTP(S)で接続するには、次の情報が必要です:
-
HOST と PORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。
-
DATABASE NAME: デフォルトでは、
default
という名前のデータベースがあります。接続したいデータベースの名前を使用してください。 -
USERNAME と PASSWORD: デフォルトでは、ユーザー名は
default
です。あなたのユースケースに適したユーザー名を使用してください。
あなたのClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックしてください:

HTTPSを選択すると、詳細は例のcurl
コマンドに表示されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はあなたのClickHouse管理者によって設定されています。
手順
- Kafkaの
github
トピックを作成し、Githubデータセットを挿入します。
このデータセットは、ClickHouse/ClickHouse
リポジトリに焦点を当てた200,000行で構成されています。
- 対象テーブルが作成されていることを確認します。以下ではデフォルトデータベースを使用します。
- Vectorをダウンロードしてインストールします。
kafka.toml
という構成ファイルを作成し、KafkaとClickHouseインスタンスの値を変更します。
この構成およびVectorの動作についてのいくつかの重要な注意点:
- この例はConfluent Cloudに対してテストされています。そのため、
sasl.*
およびssl.enabled
セキュリティオプションはセルフマネージドの場合には適切でないかもしれません。 bootstrap_servers
の構成パラメータにプロトコルプレフィックスは必要ありません。例:pkc-2396y.us-east-1.aws.confluent.cloud:9092
- ソースパラメータ
decoding.codec = "json"
は、メッセージがClickHouseシンクに単一のJSONオブジェクトとして渡されることを保証します。メッセージを文字列として処理し、デフォルトのbytes
値を使用している場合、メッセージの内容はmessage
フィールドに追加されます。ほとんどの場合、これはVectorのはじめにガイドで説明されているようにClickHouseで処理する必要があります。 - Vectorはメッセージにいくつかのフィールドを追加します。この例では、ClickHouseシンクでは
skip_unknown_fields = true
の構成パラメータを介してこれらのフィールドを無視します。これは、ターゲットテーブルスキーマの一部でないフィールドを無視します。必要に応じて、これらのメタフィールド(例えばoffset
)が追加されるようにスキーマを調整してください。 - シンクがイベントのソースをパラメータ
inputs
を介して参照していることに注意してください。 - ClickHouseシンクの動作はこちらに説明されています。最適なスループットを得るために、ユーザーは
buffer.max_events
、batch.timeout_secs
、およびbatch.max_bytes
パラメータの調整を希望するかもしれません。ClickHouseの推奨では、単一バッチ内のイベント数の最小値として1000を考慮すべきです。均一な高スループットのユースケースでは、ユーザーはbuffer.max_events
パラメータを増加させることができます。変動が大きいスループットでは、batch.timeout_secs
パラメータの変更が必要になるかもしれません。 auto_offset_reset = "smallest"
パラメータは、Kafkaソースがトピックの先頭から開始することを強制します。これにより、ステップ(1)で発行されたメッセージを消費します。ユーザーには異なる動作が必要になる場合があります。詳細についてはこちらを参照してください。
- Vectorを起動します。
デフォルトでは、ヘルスチェックがClickHouseへの挿入が始まる前に必要です。これにより、接続が確立できていることとスキーマが読み取れることが確認されます。問題が発生した場合に役立つ追加のログを得るには、VECTOR_LOG=debug
を前に付けてください。
- データの挿入を確認します。
count |
---|
200000 |