メインコンテンツへスキップ
メインコンテンツへスキップ

ClickHouse Kafka Connect Sink

注記

サポートが必要な場合は、リポジトリで issue を作成するか、ClickHouse public Slack で質問してください。

ClickHouse Kafka Connect Sink は、Kafka トピックから ClickHouse テーブルへデータを配信する Kafka コネクタです。

ライセンス

Kafka Connector Sink は Apache 2.0 License の下で配布されています。

環境要件

Kafka Connect フレームワーク v2.7 以降が環境にインストールされている必要があります。

バージョン互換性マトリクス

ClickHouse Kafka Connect versionClickHouse versionKafka ConnectConfluent platform
1.0.0> 23.3> 2.7> 6.1

主な機能

  • 標準で厳密な exactly-once セマンティクスを提供します。これは、新しい ClickHouse コア機能である KeeperMap(コネクタのステートストアとして使用)によって実現されており、ミニマルなアーキテクチャを可能にします。
  • サードパーティ製ステートストアのサポート: 現在はデフォルトでインメモリストアを使用しますが、KeeperMap も利用可能です(Redis は今後追加予定)。
  • コア統合コンポーネント: ClickHouse によってビルド・保守・サポートされています。
  • ClickHouse Cloud に対して継続的にテストされています。
  • 宣言されたスキーマあり/スキーマレスのどちらの場合でもデータ挿入をサポートします。
  • ClickHouse のすべてのデータ型をサポートします。

インストール手順

接続情報を取得する

To connect to ClickHouse with HTTP(S) you need this information:

Parameter(s)Description
HOST and PORTTypically, the port is 8443 when using TLS or 8123 when not using TLS.
DATABASE NAMEOut of the box, there is a database named default, use the name of the database that you want to connect to.
USERNAME and PASSWORDOut of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS. Connection details are displayed in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.

一般的なインストール手順

このコネクタは、プラグインの実行に必要なすべてのクラスファイルを含む単一の JAR ファイルとして配布されています。

プラグインをインストールするには、次の手順に従ってください。

  • ClickHouse Kafka Connect Sink リポジトリの Releases ページから、Connector JAR ファイルを含む zip アーカイブをダウンロードします。
  • ZIP ファイルの内容を展開し、任意の場所にコピーします。
  • Confluent Platform がプラグインを検出できるように、プラグインディレクトリのパスを Connect プロパティファイル内の plugin.path 設定に追加します。
  • 設定で、トピック名、ClickHouse インスタンスのホスト名、およびパスワードを指定します。
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
jdbcConnectionProperties=?sslmode=STRICT
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false
  • Confluent Platform を再起動します。
  • Confluent Platform を使用している場合は、Confluent Control Center UI にログインし、利用可能なコネクタ一覧に ClickHouse Sink が表示されていることを確認します。

設定オプション

ClickHouse Sink を ClickHouse サーバーに接続するには、次の情報を指定する必要があります。

  • 接続情報: ホスト名(必須)とポート(任意)
  • ユーザー認証情報: パスワード(必須)とユーザー名(任意)
  • コネクタクラス: com.clickhouse.kafka.connect.ClickHouseSinkConnector必須
  • topics または topics.regex: ポーリングする Kafka トピック。トピック名はテーブル名と一致している必要があります(必須
  • キーおよび値コンバーター: トピック上のデータ種別に基づいて設定します。ワーカー設定で既に定義されていない場合は必須です。

設定オプションの完全な一覧表:

Property NameDescriptionDefault Value
hostname (Required)サーバーのホスト名または IP アドレスN/A
portClickHouse のポート。デフォルトは 8443(クラウドでの HTTPS 用)ですが、HTTP(セルフホスト時のデフォルト)の場合は 8123 を指定する必要がある8443
sslClickHouse への SSL 接続を有効にするかどうかtrue
jdbcConnectionPropertiesClickHouse に接続する際の接続プロパティ。? で開始し、param=value& で連結する必要がある""
usernameClickHouse データベースのユーザー名default
password (Required)ClickHouse データベースのパスワードN/A
databaseClickHouse データベース名default
connector.class (Required)Connector クラス(明示的に設定し、デフォルト値のままにしておくこと)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxConnector Task の最大数"1"
errors.retry.timeoutClickHouse JDBC のリトライタイムアウト"60"
exactlyOnceExactly Once(正確に 1 回)処理の有効化フラグ"false"
topics (Required)ポーリングする Kafka トピック。トピック名はテーブル名と一致している必要がある""
key.converter (Required* - See Description)キーの型に応じて設定する。キーを渡す場合(かつ worker 設定で定義されていない場合)に必須。"org.apache.kafka.connect.storage.StringConverter"
value.converter (Required* - See Description)トピック上のデータ型に基づいて設定する。サポートされる形式: JSON、String、Avro、Protobuf。worker 設定で定義されていない場合はここで必須。"org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableConnector の Value Converter によるスキーマサポートを有効にするかどうか"false"
errors.toleranceConnector のエラー許容度。サポートされる値: none, all"none"
errors.deadletterqueue.topic.name設定されている場合(かつ errors.tolerance=all のとき)、失敗したバッチに対して DLQ が使用される(Troubleshooting を参照)""
errors.deadletterqueue.context.headers.enableDLQ に追加のヘッダーを付与する""
clickhouseSettingsClickHouse の設定をカンマ区切りで指定(例: "insert_quorum=2, etc...")""
topic2TableMapトピック名をテーブル名にマッピングするリストをカンマ区切りで指定(例: "topic1=table1, topic2=table2, etc...")""
tableRefreshIntervalテーブル定義キャッシュをリフレッシュする間隔(秒)0
keeperOnClusterセルフホスト環境向けに、exactly-once 用 connect_state テーブルに対する ON CLUSTER パラメータを設定可能にする(例: ON CLUSTER clusterNameInConfigFileDefinitionDistributed DDL Queries を参照)""
bypassRowBinaryスキーマベースのデータ(Avro、Protobuf など)に対して RowBinary および RowBinaryWithDefaults の使用を無効化できる。データに欠損カラムがあり、Nullable/Default が許容できない場合にのみ使用すること"false"
dateTimeFormatsDateTime64 スキーマフィールドをパースするための日時フォーマット。; 区切りで指定する(例: someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。""
tolerateStateMismatchAFTER_PROCESSING に保存されている現在のオフセットよりも「前」のレコードを connector が破棄することを許可する(例: オフセット 250 が最後に記録されたオフセットである状態で、オフセット 5 が送信された場合など)"false"
ignorePartitionsWhenBatchinginsert 用にメッセージを収集する際にパーティションを無視する(ただし exactlyOncefalse の場合のみ)。パフォーマンス上の注意: Connector Task が多いほど、1 Task あたりに割り当てられる Kafka パーティションは少なくなり、効果が逓減しうる。"false"

対象テーブル

ClickHouse Connect Sink は Kafka のトピックからメッセージを読み取り、適切なテーブルに書き込みます。ClickHouse Connect Sink が書き込むのは既存のテーブルのみです。データの挿入を開始する前に、対象テーブルが ClickHouse 上に適切なスキーマで作成済みであることを必ず確認してください。

各トピックごとに、ClickHouse 上に専用の対象テーブルが必要です。対象テーブル名は、元のトピック名と一致している必要があります。

前処理

ClickHouse Kafka Connect Sink に送信される前に送信メッセージを変換する必要がある場合は、Kafka Connect Transformations を使用してください。

サポートされるデータ型

スキーマを宣言している場合:

Kafka Connect TypeClickHouse TypeSupportedPrimitive
STRINGStringYes
STRINGJSON. See below (1)Yes
INT8Int8Yes
INT16Int16Yes
INT32Int32Yes
INT64Int64Yes
FLOAT32Float32Yes
FLOAT64Float64Yes
BOOLEANBooleanYes
ARRAYArray(T)No
MAPMap(Primitive, T)No
STRUCTVariant(T1, T2, ...)No
STRUCTTuple(a T1, b T2, ...)No
STRUCTNested(a T1, b T2, ...)No
STRUCTJSON. See below (1), (2)No
BYTESStringNo
org.apache.kafka.connect.data.TimeInt64 / DateTime64No
org.apache.kafka.connect.data.TimestampInt32 / Date32No
org.apache.kafka.connect.data.DecimalDecimalNo
  • (1) - JSON がサポートされるのは、ClickHouse の設定で input_format_binary_read_json_as_string=1 が有効になっている場合のみです。これは RowBinary フォーマットファミリーでのみ動作し、この設定は挿入リクエスト内のすべてのカラムに影響するため、すべて文字列型である必要があります。この場合、コネクタは STRUCT を JSON 文字列に変換します。

  • (2) - 構造体に oneof のような union が含まれている場合、コンバータはフィールド名にプレフィックス/サフィックスを追加しないように設定する必要があります。ProtobufConverter には generate.index.for.unions=false という 設定 があります。

スキーマを宣言していない場合:

レコードは JSON に変換され、JSONEachRow フォーマットの値として ClickHouse に送信されます。

設定レシピ

すぐに使い始めるための、一般的な設定レシピをいくつか示します。

基本設定

最も基本的な設定です。Kafka Connect を分散モードで実行しており、localhost:8443 で SSL 有効な ClickHouse サーバーが稼働していて、データはスキーマレスな JSON であることを前提としています。

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "consumer.override.max.poll.records": "5000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "database": "default",
    "errors.retry.timeout": "60",
    "exactlyOnce": "false",
    "hostname": "localhost",
    "port": "8443",
    "ssl": "true",
    "jdbcConnectionProperties": "?ssl=true&sslmode=strict",
    "username": "default",
    "password": "<PASSWORD>",
    "topics": "<TOPIC_NAME>",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "clickhouseSettings": ""
  }
}

複数のトピックを対象とした基本構成

コネクタは複数のトピックからデータを読み取ることができます

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "topics": "SAMPLE_TOPIC, ANOTHER_TOPIC, YET_ANOTHER_TOPIC",
    ...
  }
}

DLQ を使用した基本構成

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "<DLQ_TOPIC>",
    "errors.deadletterqueue.context.headers.enable": "true",
  }
}

異なるデータ形式での利用

Avro スキーマのサポート
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
Protobuf スキーマのサポート
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}

注意:クラスが見つからないといった問題が発生する場合、一部の環境には protobuf コンバーターが含まれていないため、依存関係を同梱した別のバージョンの jar リリースが必要になる場合があります。

JSON スキーマのサポート
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  }
}
文字列のサポート

このコネクタは、さまざまな ClickHouse フォーマットにおける String コンバーターをサポートしています(JSONCSVTSV)。

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "customInsertFormat": "true",
    "insertFormat": "CSV"
  }
}

ログ記録

ログ記録は Kafka Connect Platform によって自動的に行われます。 ログの出力先や形式は、Kafka Connect の設定ファイルで設定できます。

Confluent Platform を使用している場合は、CLI コマンドを実行することでログを確認できます。

confluent local services connect のログ

詳細については、公式のチュートリアルを参照してください。

モニタリング

ClickHouse Kafka Connect は、Java Management Extensions (JMX) を通じて実行時メトリクスを公開します。JMX は Kafka Connector でデフォルトで有効になっています。

ClickHouse 固有のメトリクス

コネクタは、次の MBean 名でカスタムメトリクスを公開します。

com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id}
Metric NameTypeDescription
receivedRecordslong受信したレコードの総数。
recordProcessingTimelongレコードをグループ化し、統一された構造に変換するのに要した合計時間(ナノ秒)。
taskProcessingTimelongデータを処理して ClickHouse に挿入するのに要した合計時間(ナノ秒)。

Kafka Producer/Consumer Metrics

このコネクタは、データフロー、スループット、およびパフォーマンスの把握に役立つ、標準的な Kafka producer/consumer のメトリクスを公開しています。

トピックレベルのメトリクス:

  • records-sent-total: トピックに送信されたレコードの総数
  • bytes-sent-total: トピックに送信されたバイト数の合計
  • record-send-rate: 1 秒あたりに送信されたレコードの平均レート
  • byte-rate: 1 秒あたりに送信されたバイト数の平均レート
  • compression-rate: 達成された圧縮率

パーティションレベルのメトリクス:

  • records-sent-total: パーティションに送信されたレコードの総数
  • bytes-sent-total: パーティションに送信されたバイト数の総量
  • records-lag: パーティションの現在のラグ
  • records-lead: パーティションの現在のリード
  • replica-fetch-lag: レプリカに関するラグ情報

ノードレベルの接続メトリクス:

  • connection-creation-total: Kafka ノードに対して作成された接続の総数
  • connection-close-total: クローズされた接続の総数
  • request-total: ノードに送信されたリクエストの総数
  • response-total: ノードから受信したレスポンスの総数
  • request-rate: 1 秒あたりの平均リクエストレート
  • response-rate: 1 秒あたりの平均レスポンスレート

これらのメトリクスは次の監視に役立ちます:

  • スループット: データのインジェストレートを追跡
  • ラグ: ボトルネックと処理遅延の特定
  • 圧縮: データ圧縮効率の測定
  • 接続状態: ネットワーク接続性と安定性の監視

Kafka Connect フレームワークのメトリクス

コネクタは Kafka Connect フレームワークと統合されており、タスクのライフサイクルおよびエラー追跡のためのメトリクスを公開します。

タスクステータスメトリクス:

  • task-count: コネクタ内のタスクの総数
  • running-task-count: 現在実行中のタスク数
  • paused-task-count: 現在一時停止中のタスク数
  • failed-task-count: 失敗したタスクの数
  • destroyed-task-count: 破棄されたタスクの数
  • unassigned-task-count: 未割り当てタスクの数

タスクステータスの値には次が含まれます: running, paused, failed, destroyed, unassigned

エラーメトリクス:

  • deadletterqueue-produce-failures: 失敗したデッドレターキュー (DLQ) への書き込みの数
  • deadletterqueue-produce-requests: デッドレターキューへの書き込み試行の総数
  • last-error-timestamp: 直近のエラーのタイムスタンプ
  • records-skip-total: エラーによりスキップされたレコードの総数
  • records-retry-total: リトライされたレコードの総数
  • errors-total: 発生したエラーの総数

パフォーマンスメトリクス:

  • offset-commit-failures: 失敗したオフセットコミットの数
  • offset-commit-avg-time-ms: オフセットコミットに要する平均時間
  • offset-commit-max-time-ms: オフセットコミットに要する最大時間
  • put-batch-avg-time-ms: バッチ処理に要する平均時間
  • put-batch-max-time-ms: バッチ処理に要する最大時間
  • source-record-poll-total: 取得されたレコードの総数

監視のベストプラクティス

  1. コンシューマラグを監視する: パーティションごとに records-lag を追跡して処理ボトルネックを特定します
  2. エラーレートを追跡する: errors-totalrecords-skip-total を監視してデータ品質の問題を検出します
  3. タスクの健全性を確認する: タスクステータスメトリクスを監視してタスクが正しく実行されていることを確認します
  4. スループットを計測する: records-send-ratebyte-rate を使用してインジェスト性能を追跡します
  5. 接続状態を監視する: ノードレベルの接続メトリクスを確認してネットワークの問題を検出します
  6. 圧縮効率を追跡する: compression-rate を使用してデータ転送を最適化します

JMX メトリクスの詳細な定義および Prometheus との統合については、jmx-export-connector.yml 設定ファイルを参照してください。

制限事項

  • 削除はサポートされていません。
  • バッチサイズは Kafka Consumer のプロパティから継承されます。
  • exactly-once のために KeeperMap を使用していて、オフセットが変更または巻き戻された場合、その特定のトピックの KeeperMap の内容を削除する必要があります(詳細については、以下のトラブルシューティングガイドを参照してください)。

パフォーマンスチューニングとスループット最適化

このセクションでは、ClickHouse Kafka Connect Sink のパフォーマンスチューニング手法について説明します。大規模なスループットが必要なユースケースを扱う場合や、リソース使用率を最適化しラグを最小化する必要がある場合、パフォーマンスチューニングは重要です。

いつパフォーマンスチューニングが必要になるか

パフォーマンスチューニングが一般的に必要となるのは、次のようなシナリオです:

  • 高スループットワークロード: Kafka トピックから毎秒数百万件のイベントを処理する場合
  • コンシューマラグ: コネクタがデータ生成レートに追いつかず、ラグが増加している場合
  • リソース制約: CPU、メモリ、またはネットワーク使用量を最適化する必要がある場合
  • 複数トピック: 複数の高ボリュームトピックを同時に消費している場合
  • メッセージサイズが小さい場合: 多数の小さなメッセージを扱い、サーバーサイドでのバッチ処理の恩恵を受けられる場合

次のような場合には、パフォーマンスチューニングは通常必要ありません:

  • 低〜中程度のボリューム(< 10,000 メッセージ/秒)を処理している場合
  • コンシューマラグが安定しており、ユースケース上許容可能な場合
  • デフォルトのコネクタ設定で既にスループット要件を満たしている場合
  • ClickHouse クラスターが受信負荷を容易に処理できている場合

データフローの理解

チューニングを行う前に、コネクタ内でデータがどのように流れるかを理解しておくことが重要です。

  1. Kafka Connect フレームワーク がバックグラウンドで Kafka のトピックからメッセージを取得する
  2. コネクタがポーリング してフレームワークの内部バッファからメッセージを取得する
  3. コネクタがバッチ化 し、ポーリングサイズに基づいてメッセージをまとめる
  4. ClickHouse が受信 し、バッチ化された INSERT を HTTP/S 経由で受け取る
  5. ClickHouse が処理 し、INSERT を同期または非同期で処理する

これら各段階でパフォーマンスを最適化できます。

Kafka Connect のバッチサイズ調整

最初の最適化ポイントは、Kafka からコネクタが 1 バッチあたりに受け取るデータ量を制御することです。

フェッチ設定

Kafka Connect(フレームワーク)は、コネクタとは独立してバックグラウンドで Kafka のトピックからメッセージを取得します。

  • fetch.min.bytes: フレームワークが値をコネクタに渡す前に必要となる最小データ量(デフォルト: 1 バイト)
  • fetch.max.bytes: 1 回のリクエストで取得する最大データ量(デフォルト: 52428800 / 50 MB)
  • fetch.max.wait.ms: fetch.min.bytes に満たない場合にデータを返すまで待機する最大時間(デフォルト: 500 ms)
ポーリング設定

コネクタはフレームワークのバッファからメッセージをポーリングします。

  • max.poll.records: 1 回のポーリングで返される最大レコード数(デフォルト: 500)
  • max.partition.fetch.bytes: パーティションごとの最大データ量(デフォルト: 1048576 / 1 MB)

ClickHouse のパフォーマンスを最適化するには、より大きなバッチサイズを目標としてください。

# ポーリング1回あたりのレコード数を増やす
consumer.max.poll.records=5000

パーティションのフェッチサイズを増やす (5 MB)

consumer.max.partition.fetch.bytes=5242880

任意: より多くのデータが揃うまで待つように最小フェッチサイズを増やす (1 MB)

consumer.fetch.min.bytes=1048576

オプション: レイテンシがクリティカルな場合の待機時間を短縮する

consumer.fetch.max.wait.ms=300


**重要**: Kafka Connect のフェッチ設定は圧縮データを前提としていますが、ClickHouse が受信するのは非圧縮データです。使用している圧縮率に応じて、これらの設定のバランスを調整してください。

**トレードオフ**:
- **より大きなバッチ** = ClickHouse へのインジェスト性能の向上、パーツ数の削減、オーバーヘッドの低減
- **より大きなバッチ** = メモリ使用量の増加、エンドツーエンドのレイテンシー増大の可能性
- **バッチが大きすぎる場合** = タイムアウト、OutOfMemory エラー、`max.poll.interval.ms` 超過のリスク

詳細については、[Confluent ドキュメント](https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration) および [Kafka ドキュメント](https://kafka.apache.org/documentation/#consumerconfigs) を参照してください。

#### 非同期インサート                        

非同期インサートは、コネクタが比較的小さなバッチを送信する場合や、バッチ処理の責務を ClickHouse 側に移してインジェストをさらに最適化したい場合に有用な強力な機能です。

##### 非同期インサートを使用すべき場合                             

次のような場合は、非同期インサートを有効化することを検討してください:

- **小さなバッチが多数ある場合**: コネクタが頻繁に小さなバッチ (1 バッチあたり 1000 行未満) を送信している
- **高い同時実行性がある場合**: 複数のコネクタタスクが同じテーブルに書き込んでいる
- **分散デプロイの場合**: 複数のホスト上で多数のコネクタインスタンスを実行している
- **パーツ作成のオーバーヘッドが問題になっている場合**: 「too many parts」エラーが発生している
- **ワークロードが混在している場合**: リアルタイムのインジェストとクエリワークロードを組み合わせている

次のような場合は、非同期インサートを**使用しないでください**:

- すでに制御された頻度で大きなバッチ (1 バッチあたり 10,000 行超) を送信している
- 即時のデータ可視性が必要である (クエリがデータを即座に参照できなければならない)
- `wait_for_async_insert=0` を用いた exactly-once セマンティクスが要件と競合する
- クライアント側でのバッチ処理の改善の方がユースケースに適している

##### 非同期インサートの動作                          

非同期インサートを有効にすると、ClickHouse は次のように動作します:

1. コネクタからインサートクエリを受信する
2. データを (ディスクではなく) メモリ上のバッファに書き込む
3. コネクタに成功を返す (`wait_for_async_insert=0` の場合)
4. 次のいずれかの条件を満たしたタイミングでバッファをディスクにフラッシュする:
   - バッファが `async_insert_max_data_size` (デフォルト: 10 MB) に達した場合
   - 最初のインサートから `async_insert_busy_timeout_ms` ミリ秒が経過した場合 (デフォルト: 1000 ms)
   - 蓄積されたクエリ数が最大値 (`async_insert_max_query_number`, デフォルト: 100) に達した場合

これにより作成されるパーツ数が大幅に削減され、全体的なスループットが向上します。

##### 非同期インサートの有効化                          

`clickhouseSettings` 構成パラメータに非同期インサートの設定を追加します:

```json
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}

主な設定:

  • async_insert=1: 非同期インサートを有効にする
  • wait_for_async_insert=1(推奨): コネクタは、ClickHouse ストレージへのフラッシュ完了を待ってから応答を返します。確実なデータ配信を保証します。
  • wait_for_async_insert=0: コネクタはバッファリング直後に応答を返します。パフォーマンスは向上しますが、フラッシュ前にサーバーがクラッシュした場合はデータが失われる可能性があります。
非同期インサート動作のチューニング

非同期インサートのフラッシュ動作を細かく調整できます。

"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=10485760,async_insert_busy_timeout_ms=1000"

一般的なチューニングパラメータ:

  • async_insert_max_data_size (デフォルト: 10485760 / 10 MB): フラッシュ前の最大バッファサイズ
  • async_insert_busy_timeout_ms (デフォルト: 1000): フラッシュまでの最大時間 (ミリ秒)
  • async_insert_stale_timeout_ms (デフォルト: 0): 最後の挿入からフラッシュまでの時間 (ミリ秒)
  • async_insert_max_query_number (デフォルト: 100): フラッシュ前の最大クエリ数

トレードオフ:

  • 利点: パーツ数の削減、マージ性能の向上、CPU オーバーヘッドの低減、高い並行性下でのスループット改善
  • 考慮事項: データが即座にはクエリ可能にならない、エンドツーエンドのレイテンシがわずかに増加
  • リスク: wait_for_async_insert=0 の場合にサーバークラッシュ時のデータ損失、大きなバッファによるメモリプレッシャー発生の可能性
Exactly-once セマンティクスを伴う非同期インサート

exactlyOnce=true を非同期インサートと併用する場合:

{
  "config": {
    "exactlyOnce": "true",
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}

重要: データが永続化された後にのみオフセットコミットが行われるようにするため、exactly-once セマンティクスと併用する場合は常に wait_for_async_insert=1 を使用してください。

非同期インサートの詳細については、ClickHouse の非同期インサートに関するドキュメントを参照してください。

コネクタの並列度

スループットを向上させるには、並列度を高めてください。

コネクタごとのタスク数
"tasks.max": "4"

各タスクはトピックパーティションの一部を処理します。タスク数を増やす = 並列度の向上ですが、次の点に注意が必要です:

  • 実効的な最大タスク数 = トピックパーティション数
  • 各タスクは個別に ClickHouse への接続を維持する
  • タスク数が増える = オーバーヘッド増加とリソース競合の可能性

推奨値: まずは tasks.max をトピックパーティション数と同じ値に設定し、その後 CPU およびスループットのメトリクスに基づいて調整してください。

バッチ処理時のパーティション無視

デフォルトでは、コネクタはパーティションごとにメッセージをバッチ処理します。より高いスループットが必要な場合は、パーティションをまたいでバッチ処理できます:

"ignorePartitionsWhenBatching": "true"

警告: exactlyOnce=false の場合にのみ使用してください。この設定は、より大きなバッチを作成することでスループットを向上できますが、パーティション単位の順序の保証は失われます。

複数の高スループットトピック

コネクタが複数のトピックを購読するように構成されており、topic2TableMap を使用してトピックをテーブルにマッピングしていて、挿入時にボトルネックが発生しコンシューマラグが生じている場合は、代わりにトピックごとに 1 つのコネクタを作成することを検討してください。

この問題が発生する主な理由は、現在、バッチがすべてのテーブルに対して逐次的に挿入されているためです。

推奨: 高スループットのトピックが複数ある場合は、挿入の並列スループットを最大化するために、トピックごとに 1 つのコネクタインスタンスをデプロイしてください。

ClickHouse テーブルエンジンに関する考慮事項

ユースケースに適した ClickHouse テーブルエンジンを選択してください:

  • MergeTree: ほとんどのユースケースに最適で、クエリ性能と挿入性能のバランスが良い
  • ReplicatedMergeTree: 高可用性に必須だが、レプリケーションのオーバーヘッドが追加される
  • 適切な ORDER BY を設定した *MergeTree: クエリパターンに合わせて最適化

検討すべき設定:

CREATE TABLE my_table (...)
ENGINE = MergeTree()
ORDER BY (timestamp, id)
SETTINGS 
    -- パーツを並列に書き込むために max_insert_threads(挿入スレッド数)を増やす
    max_insert_threads = 4,
    -- 信頼性向上のためにクォーラム付きの INSERT を許可する(ReplicatedMergeTree)
    insert_quorum = 2

コネクタレベルの挿入設定:

"clickhouseSettings": "insert_quorum=2,insert_quorum_timeout=60000"

コネクションプーリングとタイムアウト

コネクタは ClickHouse への HTTP 接続を維持します。レイテンシが高いネットワーク環境では、タイムアウトを調整してください。

"clickhouseSettings": "socket_timeout=300000,connection_timeout=30000"
  • socket_timeout (デフォルト: 30000 ms): 読み取り操作の最大待機時間
  • connection_timeout (デフォルト: 10000 ms): 接続が確立されるまでの最大待機時間

大きなバッチでタイムアウトエラーが発生する場合は、これらの値を引き上げてください。

パフォーマンスの監視とトラブルシューティング

次の主要なメトリクスを監視します:

  1. Consumer lag: Kafka の監視ツールを使用してパーティションごとのラグ(遅延)を追跡
  2. Connector metrics: JMX 経由で receivedRecordsrecordProcessingTimetaskProcessingTime を監視(Monitoring を参照)
  3. ClickHouse metrics:
    • system.asynchronous_inserts: 非同期インサートバッファの使用状況を監視
    • system.parts: パーツ数を監視してマージの問題を検出
    • system.merges: 実行中のマージを監視
    • system.events: InsertedRowsInsertedBytesFailedInsertQuery を追跡

一般的なパフォーマンス問題:

症状考えられる原因解決策
コンシューマーラグが大きいバッチが小さすぎるmax.poll.records を増やし、async inserts を有効にする
「Too many parts」エラー小さな挿入処理が高頻度で行われているasync inserts を有効にし、バッチサイズを増やす
タイムアウトエラーバッチサイズが大きい、ネットワークが遅いバッチサイズを減らし、socket_timeout を増やし、ネットワークを確認する
CPU 使用率が高い小さなパーツが多すぎるasync inserts を有効にし、マージ関連の設定値を引き上げる
OutOfMemory エラーバッチサイズが大きすぎるmax.poll.recordsmax.partition.fetch.bytes を減らす
タスク負荷が不均一パーティション分布が不均一パーティションを再バランスするか、tasks.max を調整する

ベストプラクティスのまとめ

  1. まずはデフォルトから始め、実際のパフォーマンスを測定してからチューニングする
  2. 可能な限り大きなバッチを優先する: 1 回の insert あたり 10,000~100,000 行を目安にする
  3. 多数の小さなバッチを送信する場合や高い並行度が必要な場合は async inserts を使用する
  4. 厳密な 1 回だけのセマンティクス(exactly-once)では常に wait_for_async_insert=1 を使用する
  5. 水平方向にスケールさせる: パーティション数まで tasks.max を増やす
  6. 高トラフィックなトピックごとに 1 つのコネクタ を割り当ててスループットを最大化する
  7. 継続的に監視する: コンシューマーラグ、パーツ数、マージアクティビティを追跡する
  8. 十分にテストする: 本番デプロイメント前に、現実的な負荷で設定変更を必ずテストする

例: 高スループット向け設定

以下は高スループット用に最適化した完全な例です。

{
  "name": "clickhouse-high-throughput",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "8",
    
    "topics": "high_volume_topic",
    "hostname": "my-clickhouse-host.cloud",
    "port": "8443",
    "database": "default",
    "username": "default",
    "password": "<PASSWORD>",
    "ssl": "true",
    
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    
    "exactlyOnce": "false",
    "ignorePartitionsWhenBatching": "true",
    
    "consumer.max.poll.records": "10000",
    "consumer.max.partition.fetch.bytes": "5242880",
    "consumer.fetch.min.bytes": "1048576",
    "consumer.fetch.max.wait.ms": "500",
    
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=16777216,async_insert_busy_timeout_ms=1000,socket_timeout=300000"
  }
}

この構成では:

  • ポーリングごとに最大 10,000 レコードを処理します
  • 大きな挿入に対応するため、パーティションをまたいでバッチ処理します
  • 16 MB のバッファを使用した非同期インサートを行います
  • 8 個のタスクを並列実行します(パーティション数に合わせて調整してください)
  • 厳密な順序性よりもスループットを優先するように最適化されています

トラブルシューティング

"State mismatch for topic [someTopic] partition [0]"

これは、KeeperMap に保存されているオフセットと Kafka に保存されているオフセットが異なる場合に発生します。
通常、トピックが削除された場合や、オフセットが手動で調整された場合に起こります。
これを解消するには、該当するトピック + パーティションに対して保存されている古い値を削除する必要があります。

注意: この調整は exactly-once セマンティクスに影響を与える可能性があります。

"What errors will the connector retry?"

現時点では、再試行可能な一時的エラーを特定することに重点を置いており、次のものが含まれます:

  • ClickHouseException - これは ClickHouse によってスローされる汎用的な例外です。
    通常、サーバーが過負荷のときにスローされ、特に一時的とみなされるエラーコードは次のとおりです:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - ソケットがタイムアウトしたときにスローされます。
  • UnknownHostException - ホスト名が解決できないときにスローされます。
  • IOException - ネットワークに問題がある場合にスローされます。

「すべてのデータが空/ゼロになっている」

おそらく、データ内のフィールドがテーブル内のフィールドと一致していません。これは特に CDC(変更データキャプチャ)や Debezium フォーマットでよく発生します。 よくある解決策としては、コネクタ設定に flatten 変換を追加することです。

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_

これは、ネストされた JSON をフラットな JSON に変換します(_ を区切り文字として使用)。テーブル内のフィールドは「field1_field2_field3」の形式(例: 「before_id」、「after_id」など)になります。

「ClickHouse で自分の Kafka キーを使いたい」

Kafka のキーはデフォルトでは value フィールドに格納されませんが、KeyToValue 変換を使用して、キーを(新しい _key というフィールド名の下に)value フィールドへ移動できます。

transforms=keyToValue
transforms.keyToValue.type=com.clickhouse.kafka.connect.transforms.KeyToValue
transforms.keyToValue.field=_key