Kafka と ClickHouse で Vector を利用する
Kafka と ClickHouse で Vector を使用する
Vector はベンダー非依存のデータパイプラインであり、Kafka からデータを読み取り、ClickHouse にイベントを送信できます。
ClickHouse と組み合わせた Vector の入門ガイドでは、ログのユースケースとファイルからのイベント読み取りに焦点を当てています。ここでは、Kafka トピックに格納されたイベントを含む GitHub サンプルデータセットを利用します。
Vector は、プッシュまたはプルモデルでデータを取得するために sources を利用します。一方で sinks はイベントの送信先を提供します。したがって、Kafka source と ClickHouse sink を使用します。なお、Kafka は sink としてサポートされていますが、ClickHouse source は利用できません。その結果、Vector は ClickHouse から Kafka へデータを転送したいユーザーには適していません。
Vector はデータの変換にも対応していますが、これは本ガイドの範囲外です。この機能が必要な場合は、Vector のドキュメントを参照してください。
現在の ClickHouse sink の実装では HTTP インターフェースを利用している点に注意してください。ClickHouse sink は現時点では JSON スキーマの利用をサポートしていません。データはプレーンな JSON 形式、もしくは文字列として Kafka に送信される必要があります。
ライセンス
Vector は MPL-2.0 License の下で配布されています。
接続情報を収集する
To connect to ClickHouse with HTTP(S) you need this information:
| Parameter(s) | Description |
|---|---|
HOST and PORT | Typically, the port is 8443 when using TLS or 8123 when not using TLS. |
DATABASE NAME | Out of the box, there is a database named default, use the name of the database that you want to connect to. |
USERNAME and PASSWORD | Out of the box, the username is default. Use the username appropriate for your use case. |
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

Choose HTTPS. Connection details are displayed in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
手順
- Kafka に
githubトピックを作成し、GitHub データセットを投入します。
このデータセットは、ClickHouse/ClickHouse リポジトリに焦点を当てた 200,000 行で構成されています。
- 対象テーブルが作成されていることを確認します。ここではデフォルトデータベースを使用します。
CREATE TABLE github ( file_time DateTime, event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22), actor_login LowCardinality(String), repo_name LowCardinality(String), created_at DateTime, updated_at DateTime, action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20), comment_id UInt64, path String, ref LowCardinality(String), ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4), creator_user_login LowCardinality(String), number UInt32, title String, labels Array(LowCardinality(String)), state Enum('none' = 0, 'open' = 1, 'closed' = 2), assignee LowCardinality(String), assignees Array(LowCardinality(String)), closed_at DateTime, merged_at DateTime, merge_commit_sha String, requested_reviewers Array(LowCardinality(String)), merged_by LowCardinality(String), review_comments UInt32, member_login LowCardinality(String) ) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);
この設定および Vector の動作について、いくつか重要な注意点があります。
- この例は Confluent Cloud に対してテストされています。そのため、
sasl.*およびssl.enabledセキュリティオプションは、セルフマネージドなケースでは適切でない可能性があります。 - 設定パラメータ
bootstrap_serversにはプロトコルのプレフィックスは不要です(例:pkc-2396y.us-east-1.aws.confluent.cloud:9092)。 - ソースパラメータ
decoding.codec = "json"は、メッセージが単一の JSON オブジェクトとして ClickHouse sink に渡されることを保証します。メッセージを文字列として扱い、デフォルト値のbytesを使用する場合、メッセージの内容はフィールドmessageに格納されます。多くの場合、これは Vector getting started ガイドで説明しているように、ClickHouse 側での処理が必要になります。 - Vector はメッセージに対して多数のフィールドを追加します。この例では、ClickHouse sink の設定パラメータ
skip_unknown_fields = trueによって、これらのフィールドを無視しています。これは、ターゲットテーブルのスキーマに含まれないフィールドを無視する設定です。offsetのようなこれらのメタフィールドが追加されるように、スキーマを調整してもかまいません。 inputsパラメータによって、sink がイベントのソースを参照している点に注目してください。- ClickHouse sink の動作についてはこちらを参照してください。スループットを最適化するため、
buffer.max_events、batch.timeout_secs、batch.max_bytesパラメータのチューニングを検討してください。ClickHouse の推奨事項に従うと、1 バッチあたりのイベント数については、1000 を最小値として考慮する必要があります。スループットが一様に高いユースケースでは、buffer.max_eventsパラメータを増やすことができます。スループットにばらつきがある場合は、batch.timeout_secsパラメータの調整が必要になることがあります。 - パラメータ
auto_offset_reset = "smallest"は、Kafka ソースがトピックの先頭から読み取りを開始することを強制し、これによりステップ (1) で公開されたメッセージを確実に消費できるようにします。ユーザーによっては、異なる動作が必要になることがあります。詳細はこちらを参照してください。
- Vector を起動する
デフォルトでは、ClickHouse への挿入処理が開始される前に、health check が必要です。これにより、接続が確立できることと、スキーマが読み取れることが保証されます。問題が発生した場合に役立つ、より詳細なログを取得するには、コマンドの前に VECTOR_LOG=debug を付けて実行してください。
- データが挿入されたことを確認します。
| 件数 |
|---|
| 200000 |