ClickHouse Kafka Connect Sink
サポートが必要な場合は、リポジトリで issue を作成するか、ClickHouse public Slack で質問してください。
ClickHouse Kafka Connect Sink は、Kafka トピックから ClickHouse テーブルへデータを配信する Kafka コネクタです。
ライセンス
Kafka Connector Sink は Apache 2.0 License の下で配布されています。
環境要件
Kafka Connect フレームワーク v2.7 以降が環境にインストールされている必要があります。
バージョン互換性マトリクス
| ClickHouse Kafka Connect version | ClickHouse version | Kafka Connect | Confluent platform |
|---|---|---|---|
| 1.0.0 | > 23.3 | > 2.7 | > 6.1 |
主な機能
- 標準で厳密な exactly-once セマンティクスを提供します。これは、新しい ClickHouse コア機能である KeeperMap(コネクタのステートストアとして使用)によって実現されており、ミニマルなアーキテクチャを可能にします。
- サードパーティ製ステートストアのサポート: 現在はデフォルトでインメモリストアを使用しますが、KeeperMap も利用可能です(Redis は今後追加予定)。
- コア統合コンポーネント: ClickHouse によってビルド・保守・サポートされています。
- ClickHouse Cloud に対して継続的にテストされています。
- 宣言されたスキーマあり/スキーマレスのどちらの場合でもデータ挿入をサポートします。
- ClickHouse のすべてのデータ型をサポートします。
インストール手順
接続情報を取得する
HTTP(S) で ClickHouse に接続するには、次の情報が必要です。
| Parameter(s) | Description |
|---|---|
HOST and PORT | 通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合のポートは 8123 です。 |
DATABASE NAME | 既定で default という名前のデータベースが用意されています。接続したいデータベースの名前を使用してください。 |
USERNAME and PASSWORD | 既定のユーザー名は default です。用途に応じて適切なユーザー名を使用してください。 |
ClickHouse Cloud サービスに関する詳細情報は、ClickHouse Cloud コンソールで確認できます。 サービスを選択し、Connect をクリックします。

HTTPS を選択します。接続情報は、サンプルの curl コマンド内に表示されます。

自己管理型の ClickHouse を使用している場合、接続情報は ClickHouse 管理者によって設定されます。
一般的なインストール手順
このコネクタは、プラグインの実行に必要なすべてのクラスファイルを含む単一の JAR ファイルとして配布されています。
プラグインをインストールするには、次の手順に従ってください。
- ClickHouse Kafka Connect Sink リポジトリの Releases ページから、Connector JAR ファイルを含む zip アーカイブをダウンロードします。
- ZIP ファイルの内容を展開し、任意の場所にコピーします。
- Confluent Platform がプラグインを検出できるように、プラグインディレクトリのパスを Connect プロパティファイル内の plugin.path 設定に追加します。
- 設定で、トピック名、ClickHouse インスタンスのホスト名、およびパスワードを指定します。
- Confluent Platform を再起動します。
- Confluent Platform を使用している場合は、Confluent Control Center UI にログインし、利用可能なコネクタ一覧に ClickHouse Sink が表示されていることを確認します。
設定オプション
ClickHouse Sink を ClickHouse サーバーに接続するには、次の情報を指定する必要があります。
- 接続情報: ホスト名(必須)とポート(任意)
- ユーザー認証情報: パスワード(必須)とユーザー名(任意)
- コネクタクラス:
com.clickhouse.kafka.connect.ClickHouseSinkConnector(必須) - topics または topics.regex: ポーリングする Kafka トピック。トピック名はテーブル名と一致している必要があります(必須)
- キーおよび値コンバーター: トピック上のデータ種別に基づいて設定します。ワーカー設定で既に定義されていない場合は必須です。
設定オプションの完全な一覧表:
| Property Name | Description | Default Value |
|---|---|---|
hostname (Required) | サーバーのホスト名または IP アドレス | N/A |
port | ClickHouse のポート。デフォルトは 8443(クラウドでの HTTPS 用)ですが、HTTP(セルフホスト時のデフォルト)の場合は 8123 を指定する必要がある | 8443 |
ssl | ClickHouse への SSL 接続を有効にするかどうか | true |
jdbcConnectionProperties | ClickHouse に接続する際の接続プロパティ。? で開始し、param=value を & で連結する必要がある | "" |
username | ClickHouse データベースのユーザー名 | default |
password (Required) | ClickHouse データベースのパスワード | N/A |
database | ClickHouse データベース名 | default |
connector.class (Required) | Connector クラス(明示的に設定し、デフォルト値のままにしておくこと) | "com.clickhouse.kafka.connect.ClickHouseSinkConnector" |
tasks.max | Connector Task の最大数 | "1" |
errors.retry.timeout | ClickHouse JDBC のリトライタイムアウト | "60" |
exactlyOnce | Exactly Once(正確に 1 回)処理の有効化フラグ | "false" |
topics (Required) | ポーリングする Kafka トピック。トピック名はテーブル名と一致している必要がある | "" |
key.converter (Required* - See Description) | キーの型に応じて設定する。キーを渡す場合(かつ worker 設定で定義されていない場合)に必須。 | "org.apache.kafka.connect.storage.StringConverter" |
value.converter (Required* - See Description) | トピック上のデータ型に基づいて設定する。サポートされる形式: JSON、String、Avro、Protobuf。worker 設定で定義されていない場合はここで必須。 | "org.apache.kafka.connect.json.JsonConverter" |
value.converter.schemas.enable | Connector の Value Converter によるスキーマサポートを有効にするかどうか | "false" |
errors.tolerance | Connector のエラー許容度。サポートされる値: none, all | "none" |
errors.deadletterqueue.topic.name | 設定されている場合(かつ errors.tolerance=all のとき)、失敗したバッチに対して DLQ が使用される(Troubleshooting を参照) | "" |
errors.deadletterqueue.context.headers.enable | DLQ に追加のヘッダーを付与する | "" |
clickhouseSettings | ClickHouse の設定をカンマ区切りで指定(例: "insert_quorum=2, etc...") | "" |
topic2TableMap | トピック名をテーブル名にマッピングするリストをカンマ区切りで指定(例: "topic1=table1, topic2=table2, etc...") | "" |
tableRefreshInterval | テーブル定義キャッシュをリフレッシュする間隔(秒) | 0 |
keeperOnCluster | セルフホスト環境向けに、exactly-once 用 connect_state テーブルに対する ON CLUSTER パラメータを設定可能にする(例: ON CLUSTER clusterNameInConfigFileDefinition。 Distributed DDL Queries を参照) | "" |
bypassRowBinary | スキーマベースのデータ(Avro、Protobuf など)に対して RowBinary および RowBinaryWithDefaults の使用を無効化できる。データに欠損カラムがあり、Nullable/Default が許容できない場合にのみ使用すること | "false" |
dateTimeFormats | DateTime64 スキーマフィールドをパースするための日時フォーマット。; 区切りで指定する(例: someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。 | "" |
tolerateStateMismatch | AFTER_PROCESSING に保存されている現在のオフセットよりも「前」のレコードを connector が破棄することを許可する(例: オフセット 250 が最後に記録されたオフセットである状態で、オフセット 5 が送信された場合など) | "false" |
ignorePartitionsWhenBatching | insert 用にメッセージを収集する際にパーティションを無視する(ただし exactlyOnce が false の場合のみ)。パフォーマンス上の注意: Connector Task が多いほど、1 Task あたりに割り当てられる Kafka パーティションは少なくなり、効果が逓減しうる。 | "false" |
対象テーブル
ClickHouse Connect Sink は Kafka のトピックからメッセージを読み取り、適切なテーブルに書き込みます。ClickHouse Connect Sink が書き込むのは既存のテーブルのみです。データの挿入を開始する前に、対象テーブルが ClickHouse 上に適切なスキーマで作成済みであることを必ず確認してください。
各トピックごとに、ClickHouse 上に専用の対象テーブルが必要です。対象テーブル名は、元のトピック名と一致している必要があります。
前処理
ClickHouse Kafka Connect Sink に送信される前に送信メッセージを変換する必要がある場合は、Kafka Connect Transformations を使用してください。
サポートされるデータ型
スキーマを宣言している場合:
| Kafka Connect Type | ClickHouse Type | Supported | Primitive |
|---|---|---|---|
| STRING | String | ✅ | Yes |
| STRING | JSON. See below (1) | ✅ | Yes |
| INT8 | Int8 | ✅ | Yes |
| INT16 | Int16 | ✅ | Yes |
| INT32 | Int32 | ✅ | Yes |
| INT64 | Int64 | ✅ | Yes |
| FLOAT32 | Float32 | ✅ | Yes |
| FLOAT64 | Float64 | ✅ | Yes |
| BOOLEAN | Boolean | ✅ | Yes |
| ARRAY | Array(T) | ✅ | No |
| MAP | Map(Primitive, T) | ✅ | No |
| STRUCT | Variant(T1, T2, ...) | ✅ | No |
| STRUCT | Tuple(a T1, b T2, ...) | ✅ | No |
| STRUCT | Nested(a T1, b T2, ...) | ✅ | No |
| STRUCT | JSON. See below (1), (2) | ✅ | No |
| BYTES | String | ✅ | No |
| org.apache.kafka.connect.data.Time | Int64 / DateTime64 | ✅ | No |
| org.apache.kafka.connect.data.Timestamp | Int32 / Date32 | ✅ | No |
| org.apache.kafka.connect.data.Decimal | Decimal | ✅ | No |
-
(1) - JSON がサポートされるのは、ClickHouse の設定で
input_format_binary_read_json_as_string=1が有効になっている場合のみです。これは RowBinary フォーマットファミリーでのみ動作し、この設定は挿入リクエスト内のすべてのカラムに影響するため、すべて文字列型である必要があります。この場合、コネクタは STRUCT を JSON 文字列に変換します。 -
(2) - 構造体に
oneofのような union が含まれている場合、コンバータはフィールド名にプレフィックス/サフィックスを追加しないように設定する必要があります。ProtobufConverterにはgenerate.index.for.unions=falseという 設定 があります。
スキーマを宣言していない場合:
レコードは JSON に変換され、JSONEachRow フォーマットの値として ClickHouse に送信されます。
設定レシピ
すぐに使い始めるための、一般的な設定レシピをいくつか示します。
基本設定
最も基本的な設定です。Kafka Connect を分散モードで実行しており、localhost:8443 で SSL 有効な ClickHouse サーバーが稼働していて、データはスキーマレスな JSON であることを前提としています。
複数のトピックを対象とした基本構成
コネクタは複数のトピックからデータを読み取ることができます
DLQ を使用した基本構成
異なるデータ形式での利用
Avro スキーマのサポート
Protobuf スキーマのサポート
注意:クラスが見つからないといった問題が発生する場合、一部の環境には protobuf コンバーターが含まれていないため、依存関係を同梱した別のバージョンの jar リリースが必要になる場合があります。
JSON スキーマのサポート
文字列のサポート
このコネクタは、さまざまな ClickHouse フォーマットにおける String コンバーターをサポートしています(JSON、CSV、TSV)。
ログ記録
ログ記録は Kafka Connect Platform によって自動的に行われます。 ログの出力先や形式は、Kafka Connect の設定ファイルで設定できます。
Confluent Platform を使用している場合は、CLI コマンドを実行することでログを確認できます。
詳細については、公式のチュートリアルを参照してください。
モニタリング
ClickHouse Kafka Connect は、Java Management Extensions (JMX) を通じて実行時メトリクスを公開します。JMX は Kafka Connector でデフォルトで有効になっています。
ClickHouse 固有のメトリクス
コネクタは、次の MBean 名でカスタムメトリクスを公開します。
| Metric Name | Type | Description |
|---|---|---|
receivedRecords | long | 受信したレコードの総数。 |
recordProcessingTime | long | レコードをグループ化し、統一された構造に変換するのに要した合計時間(ナノ秒)。 |
taskProcessingTime | long | データを処理して ClickHouse に挿入するのに要した合計時間(ナノ秒)。 |
Kafka Producer/Consumer Metrics
このコネクタは、データフロー、スループット、およびパフォーマンスの把握に役立つ、標準的な Kafka producer/consumer のメトリクスを公開しています。
トピックレベルのメトリクス:
records-sent-total: トピックに送信されたレコードの総数bytes-sent-total: トピックに送信されたバイト数の合計record-send-rate: 1 秒あたりに送信されたレコードの平均レートbyte-rate: 1 秒あたりに送信されたバイト数の平均レートcompression-rate: 達成された圧縮率
パーティションレベルのメトリクス:
records-sent-total: パーティションに送信されたレコードの総数bytes-sent-total: パーティションに送信されたバイト数の総量records-lag: パーティションの現在のラグrecords-lead: パーティションの現在のリードreplica-fetch-lag: レプリカに関するラグ情報
ノードレベルの接続メトリクス:
connection-creation-total: Kafka ノードに対して作成された接続の総数connection-close-total: クローズされた接続の総数request-total: ノードに送信されたリクエストの総数response-total: ノードから受信したレスポンスの総数request-rate: 1 秒あたりの平均リクエストレートresponse-rate: 1 秒あたりの平均レスポンスレート
これらのメトリクスは次の監視に役立ちます:
- スループット: データのインジェストレートを追跡
- ラグ: ボトルネックと処理遅延の特定
- 圧縮: データ圧縮効率の測定
- 接続状態: ネットワーク接続性と安定性の監視
Kafka Connect フレームワークのメトリクス
コネクタは Kafka Connect フレームワークと統合されており、タスクのライフサイクルおよびエラー追跡のためのメトリクスを公開します。
タスクステータスメトリクス:
task-count: コネクタ内のタスクの総数running-task-count: 現在実行中のタスク数paused-task-count: 現在一時停止中のタスク数failed-task-count: 失敗したタスクの数destroyed-task-count: 破棄されたタスクの数unassigned-task-count: 未割り当てタスクの数
タスクステータスの値には次が含まれます: running, paused, failed, destroyed, unassigned
エラーメトリクス:
deadletterqueue-produce-failures: 失敗したデッドレターキュー (DLQ) への書き込みの数deadletterqueue-produce-requests: デッドレターキューへの書き込み試行の総数last-error-timestamp: 直近のエラーのタイムスタンプrecords-skip-total: エラーによりスキップされたレコードの総数records-retry-total: リトライされたレコードの総数errors-total: 発生したエラーの総数
パフォーマンスメトリクス:
offset-commit-failures: 失敗したオフセットコミットの数offset-commit-avg-time-ms: オフセットコミットに要する平均時間offset-commit-max-time-ms: オフセットコミットに要する最大時間put-batch-avg-time-ms: バッチ処理に要する平均時間put-batch-max-time-ms: バッチ処理に要する最大時間source-record-poll-total: 取得されたレコードの総数
監視のベストプラクティス
- コンシューマラグを監視する: パーティションごとに
records-lagを追跡して処理ボトルネックを特定します - エラーレートを追跡する:
errors-totalとrecords-skip-totalを監視してデータ品質の問題を検出します - タスクの健全性を確認する: タスクステータスメトリクスを監視してタスクが正しく実行されていることを確認します
- スループットを計測する:
records-send-rateとbyte-rateを使用してインジェスト性能を追跡します - 接続状態を監視する: ノードレベルの接続メトリクスを確認してネットワークの問題を検出します
- 圧縮効率を追跡する:
compression-rateを使用してデータ転送を最適化します
JMX メトリクスの詳細な定義および Prometheus との統合については、jmx-export-connector.yml 設定ファイルを参照してください。
制限事項
- 削除はサポートされていません。
- バッチサイズは Kafka Consumer のプロパティから継承されます。
- exactly-once のために KeeperMap を使用していて、オフセットが変更または巻き戻された場合、その特定のトピックの KeeperMap の内容を削除する必要があります(詳細については、以下のトラブルシューティングガイドを参照してください)。
パフォーマンスチューニングとスループット最適化
このセクションでは、ClickHouse Kafka Connect Sink のパフォーマンスチューニング手法について説明します。大規模なスループットが必要なユースケースを扱う場合や、リソース使用率を最適化しラグを最小化する必要がある場合、パフォーマンスチューニングは重要です。
いつパフォーマンスチューニングが必要になるか
パフォーマンスチューニングが一般的に必要となるのは、次のようなシナリオです:
- 高スループットワークロード: Kafka トピックから毎秒数百万件のイベントを処理する場合
- コンシューマラグ: コネクタがデータ生成レートに追いつかず、ラグが増加している場合
- リソース制約: CPU、メモリ、またはネットワーク使用量を最適化する必要がある場合
- 複数トピック: 複数の高ボリュームトピックを同時に消費している場合
- メッセージサイズが小さい場合: 多数の小さなメッセージを扱い、サーバーサイドでのバッチ処理の恩恵を受けられる場合
次のような場合には、パフォーマンスチューニングは通常必要ありません:
- 低〜中程度のボリューム(< 10,000 メッセージ/秒)を処理している場合
- コンシューマラグが安定しており、ユースケース上許容可能な場合
- デフォルトのコネクタ設定で既にスループット要件を満たしている場合
- ClickHouse クラスターが受信負荷を容易に処理できている場合
データフローの理解
チューニングを行う前に、コネクタ内でデータがどのように流れるかを理解しておくことが重要です。
- Kafka Connect フレームワーク がバックグラウンドで Kafka のトピックからメッセージを取得する
- コネクタがポーリング してフレームワークの内部バッファからメッセージを取得する
- コネクタがバッチ化 し、ポーリングサイズに基づいてメッセージをまとめる
- ClickHouse が受信 し、バッチ化された
INSERTを HTTP/S 経由で受け取る - ClickHouse が処理 し、
INSERTを同期または非同期で処理する
これら各段階でパフォーマンスを最適化できます。
Kafka Connect のバッチサイズ調整
最初の最適化ポイントは、Kafka からコネクタが 1 バッチあたりに受け取るデータ量を制御することです。
フェッチ設定
Kafka Connect(フレームワーク)は、コネクタとは独立してバックグラウンドで Kafka のトピックからメッセージを取得します。
fetch.min.bytes: フレームワークが値をコネクタに渡す前に必要となる最小データ量(デフォルト: 1 バイト)fetch.max.bytes: 1 回のリクエストで取得する最大データ量(デフォルト: 52428800 / 50 MB)fetch.max.wait.ms:fetch.min.bytesに満たない場合にデータを返すまで待機する最大時間(デフォルト: 500 ms)
ポーリング設定
コネクタはフレームワークのバッファからメッセージをポーリングします。
max.poll.records: 1 回のポーリングで返される最大レコード数(デフォルト: 500)max.partition.fetch.bytes: パーティションごとの最大データ量(デフォルト: 1048576 / 1 MB)
高スループット向けの推奨設定
ClickHouse のパフォーマンスを最適化するには、より大きなバッチサイズを目標としてください。
パーティションのフェッチサイズを増やす (5 MB)
consumer.max.partition.fetch.bytes=5242880
任意: より多くのデータが揃うまで待つように最小フェッチサイズを増やす (1 MB)
consumer.fetch.min.bytes=1048576
オプション: レイテンシがクリティカルな場合の待機時間を短縮する
consumer.fetch.max.wait.ms=300