メインコンテンツまでスキップ
メインコンテンツまでスキップ

Using Vector with Kafka and ClickHouse

Kafka と ClickHouse を使用した Vector の利用

Vector は、Kafka からデータを読み取り、ClickHouse にイベントを送信する能力を持つ、ベンダーに依存しないデータパイプラインです。

ClickHouse と連携するための はじめに ガイドは、ログのユースケースとファイルからのイベントの読み取りに焦点を当てています。私たちは、Kafka トピックに保持されているイベントを含む Github サンプルデータセット を利用します。

Vector は、プッシュまたはプルモデルを通じてデータを取得するために sources を利用します。一方、sinks はイベントの宛先を提供します。したがって、私は Kafka ソースと ClickHouse シンクを利用します。Kafka はシンクとしてサポートされていますが、ClickHouse ソースは利用できないため、ClickHouse から Kafka にデータを転送したいユーザーには Vector は適していません。

Vector はデータの 変換 もサポートしています。これは本ガイドの範囲を超えています。この機能が必要な場合は、ユーザーは自身のデータセットについて Vector ドキュメントを参照してください。

現在の ClickHouse シンクの実装は HTTP インターフェースを使用しています。現時点では ClickHouse シンクは JSON スキーマの使用をサポートしていません。データはプレーン JSON 形式または文字列として Kafka に公開する必要があります。

ライセンス

Vector は MPL-2.0 ライセンスのもとで配布されています。

接続詳細をまとめる

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。ご利用のケースに適したユーザー名を使用してください。

ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurlコマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。

手順

  1. Kafka の github トピックを作成し、Github データセット を挿入します。

このデータセットは、ClickHouse/ClickHouse リポジトリに焦点を当てた 200,000 行から構成されています。

  1. 目標のテーブルが作成されていることを確認します。以下ではデフォルトのデータベースを使用します。
  1. Vector をダウンロードしてインストールします。kafka.toml 構成ファイルを作成し、Kafka と ClickHouse インスタンスの値を変更します。

この構成および Vector の動作に関する重要な注意点がいくつかあります:

  • この例は Confluent Cloud に対してテストされています。したがって、sasl.* および ssl.enabled のセキュリティオプションは、セルフマネージドのケースには適さない可能性があります。
  • 構成パラメータ bootstrap_servers にはプロトコルプレフィックスは必要ありません。例: pkc-2396y.us-east-1.aws.confluent.cloud:9092
  • ソースパラメータ decoding.codec = "json" は、メッセージが ClickHouse シンクに単一の JSON オブジェクトとして渡されることを保証します。メッセージを文字列として扱い、デフォルトの bytes 値を使用した場合、メッセージの内容はフィールド message に追加されます。このほとんどの場合、Vector のはじめにガイドに記載されたように ClickHouse で処理が必要です。
  • Vector はメッセージにいくつかのフィールドを 追加します。私たちの例では、設定パラメータ skip_unknown_fields = true を通じて ClickHouse シンクでこれらのフィールドを無視します。これは、ターゲットテーブルスキーマの一部でないフィールドを無視します。これらのメタフィールド(例: offset)を追加するようにスキーマを調整してください。
  • シンクがイベントのソースを inputs パラメータを通じて参照していることに注意してください。
  • ClickHouse シンクの動作については こちらを参照してください。最適なスループットを得るために、ユーザーは buffer.max_eventsbatch.timeout_secs、および batch.max_bytes パラメータを調整したいと考えるかもしれません。ClickHouse の 推奨事項 によれば、単一のバッチに含まれるイベント数の最小値として 1000 を考慮すべきです。均一な高スループットユースケースでは、ユーザーはパラメータ buffer.max_events を増やすことを検討します。可変スループットの場合は、パラメータ batch.timeout_secs の変更が必要です。
  • パラメータ auto_offset_reset = "smallest" は Kafka ソースにトピックの最初から始めさせます。これにより、ステップ (1) で公開されたメッセージを消費できるようになります。異なる動作が必要なユーザーは、こちらを参照してください。
  1. Vector を起動します。

デフォルトでは、ヘルスチェックが ClickHouse への挿入開始前に必要です。これにより、接続が確立され、スキーマが読み取れることが確認されます。問題が発生した場合は、VECTOR_LOG=debug を前に置くことで、さらに詳細なログを取得できます。

  1. データの挿入を確認します。
count
200000