ClickHouse Kafka Connect Sink
サポートが必要な場合は、リポジトリで issue を作成するか、ClickHouse public Slack で質問してください。
ClickHouse Kafka Connect Sink は、Kafka トピックから ClickHouse テーブルへデータを配信する Kafka コネクタです。
ライセンス
Kafka Connector Sink は Apache 2.0 License の下で配布されています。
環境要件
Kafka Connect フレームワーク v2.7 以降が環境にインストールされている必要があります。
バージョン互換性マトリクス
| ClickHouse Kafka Connect version | ClickHouse version | Kafka Connect | Confluent platform |
|---|---|---|---|
| 1.0.0 | > 23.3 | > 2.7 | > 6.1 |
主な機能
- 標準で厳密な exactly-once セマンティクスを提供します。これは、新しい ClickHouse コア機能である KeeperMap(コネクタのステートストアとして使用)によって実現されており、ミニマルなアーキテクチャを可能にします。
- サードパーティ製ステートストアのサポート: 現在はデフォルトでインメモリストアを使用しますが、KeeperMap も利用可能です(Redis は今後追加予定)。
- コア統合コンポーネント: ClickHouse によってビルド・保守・サポートされています。
- ClickHouse Cloud に対して継続的にテストされています。
- 宣言されたスキーマあり/スキーマレスのどちらの場合でもデータ挿入をサポートします。
- ClickHouse のすべてのデータ型をサポートします。
インストール手順
接続情報を取得する
To connect to ClickHouse with HTTP(S) you need this information:
| Parameter(s) | Description |
|---|---|
HOST and PORT | Typically, the port is 8443 when using TLS or 8123 when not using TLS. |
DATABASE NAME | Out of the box, there is a database named default, use the name of the database that you want to connect to. |
USERNAME and PASSWORD | Out of the box, the username is default. Use the username appropriate for your use case. |
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

Choose HTTPS. Connection details are displayed in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
一般的なインストール手順
このコネクタは、プラグインの実行に必要なすべてのクラスファイルを含む単一の JAR ファイルとして配布されています。
プラグインをインストールするには、次の手順に従ってください。
- ClickHouse Kafka Connect Sink リポジトリの Releases ページから、Connector JAR ファイルを含む zip アーカイブをダウンロードします。
- ZIP ファイルの内容を展開し、任意の場所にコピーします。
- Confluent Platform がプラグインを検出できるように、プラグインディレクトリのパスを Connect プロパティファイル内の plugin.path 設定に追加します。
- 設定で、トピック名、ClickHouse インスタンスのホスト名、およびパスワードを指定します。
- Confluent Platform を再起動します。
- Confluent Platform を使用している場合は、Confluent Control Center UI にログインし、利用可能なコネクタ一覧に ClickHouse Sink が表示されていることを確認します。
設定オプション
ClickHouse Sink を ClickHouse サーバーに接続するには、次の情報を指定する必要があります。
- 接続情報: ホスト名(必須)とポート(任意)
- ユーザー認証情報: パスワード(必須)とユーザー名(任意)
- コネクタクラス:
com.clickhouse.kafka.connect.ClickHouseSinkConnector(必須) - topics または topics.regex: ポーリングする Kafka トピック。トピック名はテーブル名と一致している必要があります(必須)
- キーおよび値コンバーター: トピック上のデータ種別に基づいて設定します。ワーカー設定で既に定義されていない場合は必須です。
設定オプションの完全な一覧表:
| Property Name | Description | Default Value |
|---|---|---|
hostname (Required) | サーバーのホスト名または IP アドレス | N/A |
port | ClickHouse のポート。デフォルトは 8443(クラウドでの HTTPS 用)ですが、HTTP(セルフホスト時のデフォルト)の場合は 8123 を指定する必要がある | 8443 |
ssl | ClickHouse への SSL 接続を有効にするかどうか | true |
jdbcConnectionProperties | ClickHouse に接続する際の接続プロパティ。? で開始し、param=value を & で連結する必要がある | "" |
username | ClickHouse データベースのユーザー名 | default |
password (Required) | ClickHouse データベースのパスワード | N/A |
database | ClickHouse データベース名 | default |
connector.class (Required) | Connector クラス(明示的に設定し、デフォルト値のままにしておくこと) | "com.clickhouse.kafka.connect.ClickHouseSinkConnector" |
tasks.max | Connector Task の最大数 | "1" |
errors.retry.timeout | ClickHouse JDBC のリトライタイムアウト | "60" |
exactlyOnce | Exactly Once(正確に 1 回)処理の有効化フラグ | "false" |
topics (Required) | ポーリングする Kafka トピック。トピック名はテーブル名と一致している必要がある | "" |
key.converter (Required* - See Description) | キーの型に応じて設定する。キーを渡す場合(かつ worker 設定で定義されていない場合)に必須。 | "org.apache.kafka.connect.storage.StringConverter" |
value.converter (Required* - See Description) | トピック上のデータ型に基づいて設定する。サポートされる形式: JSON、String、Avro、Protobuf。worker 設定で定義されていない場合はここで必須。 | "org.apache.kafka.connect.json.JsonConverter" |
value.converter.schemas.enable | Connector の Value Converter によるスキーマサポートを有効にするかどうか | "false" |
errors.tolerance | Connector のエラー許容度。サポートされる値: none, all | "none" |
errors.deadletterqueue.topic.name | 設定されている場合(かつ errors.tolerance=all のとき)、失敗したバッチに対して DLQ が使用される(Troubleshooting を参照) | "" |
errors.deadletterqueue.context.headers.enable | DLQ に追加のヘッダーを付与する | "" |
clickhouseSettings | ClickHouse の設定をカンマ区切りで指定(例: "insert_quorum=2, etc...") | "" |
topic2TableMap | トピック名をテーブル名にマッピングするリストをカンマ区切りで指定(例: "topic1=table1, topic2=table2, etc...") | "" |
tableRefreshInterval | テーブル定義キャッシュをリフレッシュする間隔(秒) | 0 |
keeperOnCluster | セルフホスト環境向けに、exactly-once 用 connect_state テーブルに対する ON CLUSTER パラメータを設定可能にする(例: ON CLUSTER clusterNameInConfigFileDefinition。 Distributed DDL Queries を参照) | "" |
bypassRowBinary | スキーマベースのデータ(Avro、Protobuf など)に対して RowBinary および RowBinaryWithDefaults の使用を無効化できる。データに欠損カラムがあり、Nullable/Default が許容できない場合にのみ使用すること | "false" |
dateTimeFormats | DateTime64 スキーマフィールドをパースするための日時フォーマット。; 区切りで指定する(例: someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。 | "" |
tolerateStateMismatch | AFTER_PROCESSING に保存されている現在のオフセットよりも「前」のレコードを connector が破棄することを許可する(例: オフセット 250 が最後に記録されたオフセットである状態で、オフセット 5 が送信された場合など) | "false" |
ignorePartitionsWhenBatching | insert 用にメッセージを収集する際にパーティションを無視する(ただし exactlyOnce が false の場合のみ)。パフォーマンス上の注意: Connector Task が多いほど、1 Task あたりに割り当てられる Kafka パーティションは少なくなり、効果が逓減しうる。 | "false" |
対象テーブル
ClickHouse Connect Sink は Kafka のトピックからメッセージを読み取り、適切なテーブルに書き込みます。ClickHouse Connect Sink が書き込むのは既存のテーブルのみです。データの挿入を開始する前に、対象テーブルが ClickHouse 上に適切なスキーマで作成済みであることを必ず確認してください。
各トピックごとに、ClickHouse 上に専用の対象テーブルが必要です。対象テーブル名は、元のトピック名と一致している必要があります。
前処理
ClickHouse Kafka Connect Sink に送信される前に送信メッセージを変換する必要がある場合は、Kafka Connect Transformations を使用してください。
サポートされるデータ型
スキーマを宣言している場合:
| Kafka Connect Type | ClickHouse Type | Supported | Primitive |
|---|---|---|---|
| STRING | String | ✅ | Yes |
| STRING | JSON. See below (1) | ✅ | Yes |
| INT8 | Int8 | ✅ | Yes |
| INT16 | Int16 | ✅ | Yes |
| INT32 | Int32 | ✅ | Yes |
| INT64 | Int64 | ✅ | Yes |
| FLOAT32 | Float32 | ✅ | Yes |
| FLOAT64 | Float64 | ✅ | Yes |
| BOOLEAN | Boolean | ✅ | Yes |
| ARRAY | Array(T) | ✅ | No |
| MAP | Map(Primitive, T) | ✅ | No |
| STRUCT | Variant(T1, T2, ...) | ✅ | No |
| STRUCT | Tuple(a T1, b T2, ...) | ✅ | No |
| STRUCT | Nested(a T1, b T2, ...) | ✅ | No |
| STRUCT | JSON. See below (1), (2) | ✅ | No |
| BYTES | String | ✅ | No |
| org.apache.kafka.connect.data.Time | Int64 / DateTime64 | ✅ | No |
| org.apache.kafka.connect.data.Timestamp | Int32 / Date32 | ✅ | No |
| org.apache.kafka.connect.data.Decimal | Decimal | ✅ | No |
-
(1) - JSON がサポートされるのは、ClickHouse の設定で
input_format_binary_read_json_as_string=1が有効になっている場合のみです。これは RowBinary フォーマットファミリーでのみ動作し、この設定は挿入リクエスト内のすべてのカラムに影響するため、すべて文字列型である必要があります。この場合、コネクタは STRUCT を JSON 文字列に変換します。 -
(2) - 構造体に
oneofのような union が含まれている場合、コンバータはフィールド名にプレフィックス/サフィックスを追加しないように設定する必要があります。ProtobufConverterにはgenerate.index.for.unions=falseという 設定 があります。
スキーマを宣言していない場合:
レコードは JSON に変換され、JSONEachRow フォーマットの値として ClickHouse に送信されます。
設定レシピ
すぐに使い始めるための、一般的な設定レシピをいくつか示します。
基本設定
最も基本的な設定です。Kafka Connect を分散モードで実行しており、localhost:8443 で SSL 有効な ClickHouse サーバーが稼働していて、データはスキーマレスな JSON であることを前提としています。
複数のトピックを対象とした基本構成
コネクタは複数のトピックからデータを読み取ることができます
DLQ を使用した基本構成
異なるデータ形式での利用
Avro スキーマのサポート
Protobuf スキーマのサポート
注意:クラスが見つからないといった問題が発生する場合、一部の環境には protobuf コンバーターが含まれていないため、依存関係を同梱した別のバージョンの jar リリースが必要になる場合があります。
JSON スキーマのサポート
文字列のサポート
このコネクタは、さまざまな ClickHouse フォーマットにおける String コンバーターをサポートしています(JSON、CSV、TSV)。
ログ記録
ログ記録は Kafka Connect Platform によって自動的に行われます。 ログの出力先や形式は、Kafka Connect の設定ファイルで設定できます。
Confluent Platform を使用している場合は、CLI コマンドを実行することでログを確認できます。
詳細については、公式のチュートリアルを参照してください。
モニタリング
ClickHouse Kafka Connect は、Java Management Extensions (JMX) を通じて実行時メトリクスを公開します。JMX は Kafka Connector でデフォルトで有効になっています。
ClickHouse 固有のメトリクス
コネクタは、次の MBean 名でカスタムメトリクスを公開します。
| Metric Name | Type | Description |
|---|---|---|
receivedRecords | long | 受信したレコードの総数。 |
recordProcessingTime | long | レコードをグループ化し、統一された構造に変換するのに要した合計時間(ナノ秒)。 |
taskProcessingTime | long | データを処理して ClickHouse に挿入するのに要した合計時間(ナノ秒)。 |
Kafka Producer/Consumer Metrics
このコネクタは、データフロー、スループット、およびパフォーマンスの把握に役立つ、標準的な Kafka producer/consumer のメトリクスを公開しています。
トピックレベルのメトリクス:
records-sent-total: トピックに送信されたレコードの総数bytes-sent-total: トピックに送信されたバイト数の合計record-send-rate: 1 秒あたりに送信されたレコードの平均レートbyte-rate: 1 秒あたりに送信されたバイト数の平均レートcompression-rate: 達成された圧縮率
パーティションレベルのメトリクス:
records-sent-total: パーティションに送信されたレコードの総数bytes-sent-total: パーティションに送信されたバイト数の総量records-lag: パーティションの現在のラグrecords-lead: パーティションの現在のリードreplica-fetch-lag: レプリカに関するラグ情報
ノードレベルの接続メトリクス:
connection-creation-total: Kafka ノードに対して作成された接続の総数connection-close-total: クローズされた接続の総数request-total: ノードに送信されたリクエストの総数response-total: ノードから受信したレスポンスの総数request-rate: 1 秒あたりの平均リクエストレートresponse-rate: 1 秒あたりの平均レスポンスレート
これらのメトリクスは次の監視に役立ちます:
- スループット: データのインジェストレートを追跡
- ラグ: ボトルネックと処理遅延の特定
- 圧縮: データ圧縮効率の測定
- 接続状態: ネットワーク接続性と安定性の監視
Kafka Connect フレームワークのメトリクス
コネクタは Kafka Connect フレームワークと統合されており、タスクのライフサイクルおよびエラー追跡のためのメトリクスを公開します。
タスクステータスメトリクス:
task-count: コネクタ内のタスクの総数running-task-count: 現在実行中のタスク数paused-task-count: 現在一時停止中のタスク数failed-task-count: 失敗したタスクの数destroyed-task-count: 破棄されたタスクの数unassigned-task-count: 未割り当てタスクの数
タスクステータスの値には次が含まれます: running, paused, failed, destroyed, unassigned
エラーメトリクス:
deadletterqueue-produce-failures: 失敗したデッドレターキュー (DLQ) への書き込みの数deadletterqueue-produce-requests: デッドレターキューへの書き込み試行の総数last-error-timestamp: 直近のエラーのタイムスタンプrecords-skip-total: エラーによりスキップされたレコードの総数records-retry-total: リトライされたレコードの総数errors-total: 発生したエラーの総数
パフォーマンスメトリクス:
offset-commit-failures: 失敗したオフセットコミットの数offset-commit-avg-time-ms: オフセットコミットに要する平均時間offset-commit-max-time-ms: オフセットコミットに要する最大時間put-batch-avg-time-ms: バッチ処理に要する平均時間put-batch-max-time-ms: バッチ処理に要する最大時間source-record-poll-total: 取得されたレコードの総数
監視のベストプラクティス
- コンシューマラグを監視する: パーティションごとに
records-lagを追跡して処理ボトルネックを特定します - エラーレートを追跡する:
errors-totalとrecords-skip-totalを監視してデータ品質の問題を検出します - タスクの健全性を確認する: タスクステータスメトリクスを監視してタスクが正しく実行されていることを確認します
- スループットを計測する:
records-send-rateとbyte-rateを使用してインジェスト性能を追跡します - 接続状態を監視する: ノードレベルの接続メトリクスを確認してネットワークの問題を検出します
- 圧縮効率を追跡する:
compression-rateを使用してデータ転送を最適化します
JMX メトリクスの詳細な定義および Prometheus との統合については、jmx-export-connector.yml 設定ファイルを参照してください。
制限事項
- 削除はサポートされていません。
- バッチサイズは Kafka Consumer のプロパティから継承されます。
- exactly-once のために KeeperMap を使用していて、オフセットが変更または巻き戻された場合、その特定のトピックの KeeperMap の内容を削除する必要があります(詳細については、以下のトラブルシューティングガイドを参照してください)。
パフォーマンスチューニングとスループット最適化
このセクションでは、ClickHouse Kafka Connect Sink のパフォーマンスチューニング手法について説明します。大規模なスループットが必要なユースケースを扱う場合や、リソース使用率を最適化しラグを最小化する必要がある場合、パフォーマンスチューニングは重要です。
いつパフォーマンスチューニングが必要になるか
パフォーマンスチューニングが一般的に必要となるのは、次のようなシナリオです:
- 高スループットワークロード: Kafka トピックから毎秒数百万件のイベントを処理する場合
- コンシューマラグ: コネクタがデータ生成レートに追いつかず、ラグが増加している場合
- リソース制約: CPU、メモリ、またはネットワーク使用量を最適化する必要がある場合
- 複数トピック: 複数の高ボリュームトピックを同時に消費している場合
- メッセージサイズが小さい場合: 多数の小さなメッセージを扱い、サーバーサイドでのバッチ処理の恩恵を受けられる場合
次のような場合には、パフォーマンスチューニングは通常必要ありません:
- 低〜中程度のボリューム(< 10,000 メッセージ/秒)を処理している場合
- コンシューマラグが安定しており、ユースケース上許容可能な場合
- デフォルトのコネクタ設定で既にスループット要件を満たしている場合
- ClickHouse クラスターが受信負荷を容易に処理できている場合
データフローの理解
チューニングを行う前に、コネクタ内でデータがどのように流れるかを理解しておくことが重要です。
- Kafka Connect フレームワーク がバックグラウンドで Kafka のトピックからメッセージを取得する
- コネクタがポーリング してフレームワークの内部バッファからメッセージを取得する
- コネクタがバッチ化 し、ポーリングサイズに基づいてメッセージをまとめる
- ClickHouse が受信 し、バッチ化された
INSERTを HTTP/S 経由で受け取る - ClickHouse が処理 し、
INSERTを同期または非同期で処理する
これら各段階でパフォーマンスを最適化できます。
Kafka Connect のバッチサイズ調整
最初の最適化ポイントは、Kafka からコネクタが 1 バッチあたりに受け取るデータ量を制御することです。
フェッチ設定
Kafka Connect(フレームワーク)は、コネクタとは独立してバックグラウンドで Kafka のトピックからメッセージを取得します。
fetch.min.bytes: フレームワークが値をコネクタに渡す前に必要となる最小データ量(デフォルト: 1 バイト)fetch.max.bytes: 1 回のリクエストで取得する最大データ量(デフォルト: 52428800 / 50 MB)fetch.max.wait.ms:fetch.min.bytesに満たない場合にデータを返すまで待機する最大時間(デフォルト: 500 ms)
ポーリング設定
コネクタはフレームワークのバッファからメッセージをポーリングします。
max.poll.records: 1 回のポーリングで返される最大レコード数(デフォルト: 500)max.partition.fetch.bytes: パーティションごとの最大データ量(デフォルト: 1048576 / 1 MB)
高スループット向けの推奨設定
ClickHouse のパフォーマンスを最適化するには、より大きなバッチサイズを目標としてください。
パーティションのフェッチサイズを増やす (5 MB)
consumer.max.partition.fetch.bytes=5242880
任意: より多くのデータが揃うまで待つように最小フェッチサイズを増やす (1 MB)
consumer.fetch.min.bytes=1048576
オプション: レイテンシがクリティカルな場合の待機時間を短縮する
consumer.fetch.max.wait.ms=300
主な設定:
async_insert=1: 非同期インサートを有効にするwait_for_async_insert=1(推奨): コネクタは、ClickHouse ストレージへのフラッシュ完了を待ってから応答を返します。確実なデータ配信を保証します。wait_for_async_insert=0: コネクタはバッファリング直後に応答を返します。パフォーマンスは向上しますが、フラッシュ前にサーバーがクラッシュした場合はデータが失われる可能性があります。
非同期インサート動作のチューニング
非同期インサートのフラッシュ動作を細かく調整できます。
一般的なチューニングパラメータ:
async_insert_max_data_size(デフォルト: 10485760 / 10 MB): フラッシュ前の最大バッファサイズasync_insert_busy_timeout_ms(デフォルト: 1000): フラッシュまでの最大時間 (ミリ秒)async_insert_stale_timeout_ms(デフォルト: 0): 最後の挿入からフラッシュまでの時間 (ミリ秒)async_insert_max_query_number(デフォルト: 100): フラッシュ前の最大クエリ数
トレードオフ:
- 利点: パーツ数の削減、マージ性能の向上、CPU オーバーヘッドの低減、高い並行性下でのスループット改善
- 考慮事項: データが即座にはクエリ可能にならない、エンドツーエンドのレイテンシがわずかに増加
- リスク:
wait_for_async_insert=0の場合にサーバークラッシュ時のデータ損失、大きなバッファによるメモリプレッシャー発生の可能性
Exactly-once セマンティクスを伴う非同期インサート
exactlyOnce=true を非同期インサートと併用する場合:
重要: データが永続化された後にのみオフセットコミットが行われるようにするため、exactly-once セマンティクスと併用する場合は常に wait_for_async_insert=1 を使用してください。
非同期インサートの詳細については、ClickHouse の非同期インサートに関するドキュメントを参照してください。
コネクタの並列度
スループットを向上させるには、並列度を高めてください。
コネクタごとのタスク数
各タスクはトピックパーティションの一部を処理します。タスク数を増やす = 並列度の向上ですが、次の点に注意が必要です:
- 実効的な最大タスク数 = トピックパーティション数
- 各タスクは個別に ClickHouse への接続を維持する
- タスク数が増える = オーバーヘッド増加とリソース競合の可能性
推奨値: まずは tasks.max をトピックパーティション数と同じ値に設定し、その後 CPU およびスループットのメトリクスに基づいて調整してください。
バッチ処理時のパーティション無視
デフォルトでは、コネクタはパーティションごとにメッセージをバッチ処理します。より高いスループットが必要な場合は、パーティションをまたいでバッチ処理できます:
警告: exactlyOnce=false の場合にのみ使用してください。この設定は、より大きなバッチを作成することでスループットを向上できますが、パーティション単位の順序の保証は失われます。
複数の高スループットトピック
コネクタが複数のトピックを購読するように構成されており、topic2TableMap を使用してトピックをテーブルにマッピングしていて、挿入時にボトルネックが発生しコンシューマラグが生じている場合は、代わりにトピックごとに 1 つのコネクタを作成することを検討してください。
この問題が発生する主な理由は、現在、バッチがすべてのテーブルに対して逐次的に挿入されているためです。
推奨: 高スループットのトピックが複数ある場合は、挿入の並列スループットを最大化するために、トピックごとに 1 つのコネクタインスタンスをデプロイしてください。
ClickHouse テーブルエンジンに関する考慮事項
ユースケースに適した ClickHouse テーブルエンジンを選択してください:
MergeTree: ほとんどのユースケースに最適で、クエリ性能と挿入性能のバランスが良いReplicatedMergeTree: 高可用性に必須だが、レプリケーションのオーバーヘッドが追加される- 適切な
ORDER BYを設定した*MergeTree: クエリパターンに合わせて最適化
検討すべき設定:
コネクタレベルの挿入設定:
コネクションプーリングとタイムアウト
コネクタは ClickHouse への HTTP 接続を維持します。レイテンシが高いネットワーク環境では、タイムアウトを調整してください。
socket_timeout(デフォルト: 30000 ms): 読み取り操作の最大待機時間connection_timeout(デフォルト: 10000 ms): 接続が確立されるまでの最大待機時間
大きなバッチでタイムアウトエラーが発生する場合は、これらの値を引き上げてください。
パフォーマンスの監視とトラブルシューティング
次の主要なメトリクスを監視します:
- Consumer lag: Kafka の監視ツールを使用してパーティションごとのラグ(遅延)を追跡
- Connector metrics: JMX 経由で
receivedRecords、recordProcessingTime、taskProcessingTimeを監視(Monitoring を参照) - ClickHouse metrics:
system.asynchronous_inserts: 非同期インサートバッファの使用状況を監視system.parts: パーツ数を監視してマージの問題を検出system.merges: 実行中のマージを監視system.events:InsertedRows、InsertedBytes、FailedInsertQueryを追跡
一般的なパフォーマンス問題:
| 症状 | 考えられる原因 | 解決策 |
|---|---|---|
| コンシューマーラグが大きい | バッチが小さすぎる | max.poll.records を増やし、async inserts を有効にする |
| 「Too many parts」エラー | 小さな挿入処理が高頻度で行われている | async inserts を有効にし、バッチサイズを増やす |
| タイムアウトエラー | バッチサイズが大きい、ネットワークが遅い | バッチサイズを減らし、socket_timeout を増やし、ネットワークを確認する |
| CPU 使用率が高い | 小さなパーツが多すぎる | async inserts を有効にし、マージ関連の設定値を引き上げる |
| OutOfMemory エラー | バッチサイズが大きすぎる | max.poll.records と max.partition.fetch.bytes を減らす |
| タスク負荷が不均一 | パーティション分布が不均一 | パーティションを再バランスするか、tasks.max を調整する |
ベストプラクティスのまとめ
- まずはデフォルトから始め、実際のパフォーマンスを測定してからチューニングする
- 可能な限り大きなバッチを優先する: 1 回の insert あたり 10,000~100,000 行を目安にする
- 多数の小さなバッチを送信する場合や高い並行度が必要な場合は async inserts を使用する
- 厳密な 1 回だけのセマンティクス(exactly-once)では常に
wait_for_async_insert=1を使用する - 水平方向にスケールさせる: パーティション数まで
tasks.maxを増やす - 高トラフィックなトピックごとに 1 つのコネクタ を割り当ててスループットを最大化する
- 継続的に監視する: コンシューマーラグ、パーツ数、マージアクティビティを追跡する
- 十分にテストする: 本番デプロイメント前に、現実的な負荷で設定変更を必ずテストする
例: 高スループット向け設定
以下は高スループット用に最適化した完全な例です。
この構成では:
- ポーリングごとに最大 10,000 レコードを処理します
- 大きな挿入に対応するため、パーティションをまたいでバッチ処理します
- 16 MB のバッファを使用した非同期インサートを行います
- 8 個のタスクを並列実行します(パーティション数に合わせて調整してください)
- 厳密な順序性よりもスループットを優先するように最適化されています
トラブルシューティング
"State mismatch for topic [someTopic] partition [0]"
これは、KeeperMap に保存されているオフセットと Kafka に保存されているオフセットが異なる場合に発生します。
通常、トピックが削除された場合や、オフセットが手動で調整された場合に起こります。
これを解消するには、該当するトピック + パーティションに対して保存されている古い値を削除する必要があります。
注意: この調整は exactly-once セマンティクスに影響を与える可能性があります。
"What errors will the connector retry?"
現時点では、再試行可能な一時的エラーを特定することに重点を置いており、次のものが含まれます:
ClickHouseException- これは ClickHouse によってスローされる汎用的な例外です。
通常、サーバーが過負荷のときにスローされ、特に一時的とみなされるエラーコードは次のとおりです:- 3 - UNEXPECTED_END_OF_FILE
- 159 - TIMEOUT_EXCEEDED
- 164 - READONLY
- 202 - TOO_MANY_SIMULTANEOUS_QUERIES
- 203 - NO_FREE_CONNECTION
- 209 - SOCKET_TIMEOUT
- 210 - NETWORK_ERROR
- 242 - TABLE_IS_READ_ONLY
- 252 - TOO_MANY_PARTS
- 285 - TOO_FEW_LIVE_REPLICAS
- 319 - UNKNOWN_STATUS_OF_INSERT
- 425 - SYSTEM_ERROR
- 999 - KEEPER_EXCEPTION
- 1002 - UNKNOWN_EXCEPTION
SocketTimeoutException- ソケットがタイムアウトしたときにスローされます。UnknownHostException- ホスト名が解決できないときにスローされます。IOException- ネットワークに問題がある場合にスローされます。
「すべてのデータが空/ゼロになっている」
おそらく、データ内のフィールドがテーブル内のフィールドと一致していません。これは特に CDC(変更データキャプチャ)や Debezium フォーマットでよく発生します。
よくある解決策としては、コネクタ設定に flatten 変換を追加することです。
これは、ネストされた JSON をフラットな JSON に変換します(_ を区切り文字として使用)。テーブル内のフィールドは「field1_field2_field3」の形式(例: 「before_id」、「after_id」など)になります。
「ClickHouse で自分の Kafka キーを使いたい」
Kafka のキーはデフォルトでは value フィールドに格納されませんが、KeyToValue 変換を使用して、キーを(新しい _key というフィールド名の下に)value フィールドへ移動できます。