メインコンテンツまでスキップ
メインコンテンツまでスキップ

kafka-vector

KafkaとClickHouseを使用したVector

Vectorは、Kafkaから読み込み、ClickHouseにイベントを送信する能力を持つベンダーに依存しないデータパイプラインです。

はじめにのガイドでは、ClickHouseを使用したVectorについて、ログのユースケースとファイルからのイベントの読み取りに焦点を当てています。私たちは、Kafkaトピックに保管されているイベントを持つGithubのサンプルデータセットを利用します。

Vectorは、プッシュまたはプルモデルを通じてデータを取得するためにソースを利用します。一方、シンクは、イベントの出力先を提供します。したがって、KafkaソースとClickHouseシンクを利用します。Kafkaはシンクとしてサポートされていますが、ClickHouseソースは利用できません。そのため、ClickHouseからKafkaにデータを転送したいユーザーにはVectorは適していません。

また、Vectorはデータの変換をサポートしていますが、これはこのガイドの範囲を超えています。そのため, 必要な場合はVectorドキュメントを参照してください。

現在のClickHouseシンクの実装はHTTPインターフェースを利用しています。現時点では、ClickHouseシンクはJSONスキーマの使用をサポートしていません。データはプレーンJSON形式または文字列としてKafkaに公開する必要があります。

ライセンス

VectorはMPL-2.0ライセンスの下で配布されています。

接続の詳細を集める

ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOST と PORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAME と PASSWORD: デフォルトでは、ユーザー名はdefaultです。あなたのユースケースに適したユーザー名を使用してください。

あなたのClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細は例のcurlコマンドに表示されます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合、接続の詳細はあなたのClickHouse管理者によって設定されています。

手順

  1. Kafkaのgithubトピックを作成し、Githubデータセットを挿入します。

このデータセットは、ClickHouse/ClickHouseリポジトリに焦点を当てた200,000行で構成されています。

  1. 対象テーブルが作成されていることを確認します。以下ではデフォルトデータベースを使用します。
  1. Vectorをダウンロードしてインストールします。kafka.tomlという構成ファイルを作成し、KafkaとClickHouseインスタンスの値を変更します。

この構成およびVectorの動作についてのいくつかの重要な注意点:

  • この例はConfluent Cloudに対してテストされています。そのため、sasl.*およびssl.enabledセキュリティオプションはセルフマネージドの場合には適切でないかもしれません。
  • bootstrap_serversの構成パラメータにプロトコルプレフィックスは必要ありません。例:pkc-2396y.us-east-1.aws.confluent.cloud:9092
  • ソースパラメータdecoding.codec = "json"は、メッセージがClickHouseシンクに単一のJSONオブジェクトとして渡されることを保証します。メッセージを文字列として処理し、デフォルトのbytes値を使用している場合、メッセージの内容はmessageフィールドに追加されます。ほとんどの場合、これはVectorのはじめにガイドで説明されているようにClickHouseで処理する必要があります。
  • Vectorはメッセージにいくつかのフィールドを追加します。この例では、ClickHouseシンクではskip_unknown_fields = trueの構成パラメータを介してこれらのフィールドを無視します。これは、ターゲットテーブルスキーマの一部でないフィールドを無視します。必要に応じて、これらのメタフィールド(例えばoffset)が追加されるようにスキーマを調整してください。
  • シンクがイベントのソースをパラメータinputsを介して参照していることに注意してください。
  • ClickHouseシンクの動作はこちらに説明されています。最適なスループットを得るために、ユーザーはbuffer.max_eventsbatch.timeout_secs、およびbatch.max_bytesパラメータの調整を希望するかもしれません。ClickHouseの推奨では、単一バッチ内のイベント数の最小値として1000を考慮すべきです。均一な高スループットのユースケースでは、ユーザーはbuffer.max_eventsパラメータを増加させることができます。変動が大きいスループットでは、batch.timeout_secsパラメータの変更が必要になるかもしれません。
  • auto_offset_reset = "smallest"パラメータは、Kafkaソースがトピックの先頭から開始することを強制します。これにより、ステップ(1)で発行されたメッセージを消費します。ユーザーには異なる動作が必要になる場合があります。詳細についてはこちらを参照してください。
  1. Vectorを起動します。

デフォルトでは、ヘルスチェックがClickHouseへの挿入が始まる前に必要です。これにより、接続が確立できていることとスキーマが読み取れることが確認されます。問題が発生した場合に役立つ追加のログを得るには、VECTOR_LOG=debugを前に付けてください。

  1. データの挿入を確認します。
count
200000