メインコンテンツまでスキップ
メインコンテンツまでスキップ

ClickHouse Kafka Connect Sink

注記

助けが必要な場合は、リポジトリに問題を登録してください または ClickHouseのパブリックスラックで質問をしてください。

ClickHouse Kafka Connect Sinkは、KafkaトピックからClickHouseテーブルにデータを提供するKafkaコネクタです。

License

Kafkaコネクタシンクは、Apache 2.0 Licenseの下で配布されています。

Requirements for the environment

Kafka Connectフレームワークv2.7以降が環境にインストールされている必要があります。

Version compatibility matrix

ClickHouse Kafka Connect versionClickHouse versionKafka ConnectConfluent platform
1.0.0> 23.3> 2.7> 6.1

Main Features

  • ワンバウンドのセマンティクスを持つ状態で出荷されます。これは、コネクタによって状態ストアとして使用される新しいClickHouseコア機能であるKeeperMapによって実現され、最小限のアーキテクチャを可能にします。
  • サードパーティの状態ストアをサポート:現在はメモリ内がデフォルトですが、KeeperMapを使用することも可能です(Redisは近日中に追加予定)。
  • コア統合:ClickHouseによって構築、維持、サポートされています。
  • ClickHouse Cloudに対して継続的にテストされています。
  • 宣言されたスキーマによるデータ挿入とスキーマレスデータをサポート。
  • ClickHouseのすべてのデータ型をサポートしています。

Installation instructions

Gather your connection details

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。ご利用のケースに適したユーザー名を使用してください。

ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurlコマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。

General Installation Instructions

コネクタは、プラグインを実行するために必要なすべてのクラスファイルを含む単一のJARファイルとして配布されます。

プラグインをインストールするには、以下の手順を実行します。

  • ReleasesページからConnector JARファイルを含むZIPアーカイブをダウンロードします。
  • ZIPファイルの内容を抽出し、所望の場所にコピーします。
  • Confluent Platformがプラグインを見つけることを許可するために、Connectプロパティファイルのplugin.path設定にプラグインディレクトリへのパスを追加します。
  • configにトピック名、ClickHouseインスタンスのホスト名、およびパスワードを提供します。
  • Confluent Platformを再起動します。
  • Confluent Platformを使用している場合、Confluent Control Center UIにログインして、ClickHouse Sinkが利用可能なコネクタのリストにあることを確認します。

Configuration options

ClickHouse SinkをClickHouseサーバーに接続するには、次の情報を提供する必要があります。

  • 接続詳細:hostname(必須)とport(オプション)
  • ユーザー認証情報:password(必須)およびusername(オプション)
  • コネクタクラス:com.clickhouse.kafka.connect.ClickHouseSinkConnector必須
  • topicsまたはtopics.regex:ポーリングするKafkaトピック - トピック名はテーブル名と一致する必要があります(必須
  • キーおよび値変換器:トピック上のデータの種類に基づいて設定します。ワーカー設定にまだ定義されていない場合は必須です。

全ての設定オプションの完全な表:

Property NameDescriptionDefault Value
hostname (Required)サーバーのホスト名またはIPアドレスN/A
portClickHouseポート - デフォルトは8443(クラウドのHTTPS用)ですが、HTTP(セルフホストのデフォルト)の場合は8123にするべきです8443
sslClickHouseへのssl接続を有効にしますtrue
jdbcConnectionPropertiesClickhouseに接続する際の接続プロパティ。?で始まり、param=valueの間を&で結合します""
usernameClickHouseデータベースのユーザー名default
password (Required)ClickHouseデータベースのパスワードN/A
databaseClickHouseデータベース名default
connector.class (Required)コネクタークラス(明示的に設定し、デフォルト値を保持)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxコネクタタスクの数"1"
errors.retry.timeoutClickHouse JDBCリトライタイムアウト"60"
exactlyOnce一度だけの接続を有効にします"false"
topics (Required)ポーリングするKafkaトピック - トピック名はテーブル名と一致する必要があります""
key.converter (Required* - See Description)キーのタイプに応じて設定します。キーを渡す場合はここで必須です(ワーカー設定にまだ定義されていない場合)。"org.apache.kafka.connect.storage.StringConverter"
value.converter (Required* - See Description)トピックのデータのタイプに基づいて設定します。サポート:- JSON、String、AvroまたはProtobuf形式。ワーカー設定にまだ定義されていない場合はここで必須です。"org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableコネクタの値変換器のスキーマサポート"false"
errors.toleranceコネクタのエラー許容。サポート:none, all"none"
errors.deadletterqueue.topic.name設定されている場合(errors.tolerance=allとともに)、失敗したバッチのためにDLQが使用されます(トラブルシューティングを参照)""
errors.deadletterqueue.context.headers.enableDLQの追加ヘッダーを追加します""
clickhouseSettingsClickHouseの設定のカンマ区切りリスト(例:"insert_quorum=2, etc...")""
topic2TableMapトピック名をテーブル名にマッピングするカンマ区切りリスト(例:"topic1=table1, topic2=table2, etc...")""
tableRefreshIntervalテーブル定義キャッシュを更新する時間(秒単位)0
keeperOnClusterセルフホストインスタンスのON CLUSTERパラメータの設定を許可します(例:ON CLUSTER clusterNameInConfigFileDefinition)正確に一度だけの接続状態テーブルのために(Distributed DDL Queriesを参照)""
bypassRowBinaryスキーマベースのデータ(Avro、Protobufなど)に対するRowBinaryとRowBinaryWithDefaultsの使用を無効にします - データに欠落したカラムがある場合やNullable/デフォルトが受け入れられない場合にのみ使用する必要があります"false"
dateTimeFormatsDateTime64スキーマフィールドを解析するための日付時刻形式、;で区切ります(例:someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。""
tolerateStateMismatchコネクタがAFTER_PROCESSINGで保存された現在のオフセットよりも"早い"レコードをドロップすることを許可します(例:オフセット5が送信され、オフセット250が最後に記録されたオフセットの場合)"false"
ignorePartitionsWhenBatching挿入のためにメッセージを収集するときにパーティションを無視します(ただし、exactlyOncefalseの場合のみ)。パフォーマンスノート:コネクタタスクが多いほど、タスクごとに割り当てられるKafkaパーティションは少なくなります - これはリターンが減ることを意味します。"false"

Target Tables

ClickHouse Connect Sinkは、Kafkaトピックからメッセージを読み取り、適切なテーブルに書き込みます。ClickHouse Connect Sinkは、既存のテーブルにデータを書き込みます。データをそのテーブルに挿入し始める前に、適切なスキーマを持つターゲットテーブルがClickHouseに作成されていることを確認してください。

各トピックは、ClickHouse内に専用のターゲットテーブルを必要とします。ターゲットテーブル名は、ソーストピック名と一致する必要があります。

Pre-processing

ClickHouse Kafka Connect Sinkに送信される前にアウトバウンドメッセージを変換する必要がある場合は、Kafka Connect Transformationsを使用してください。

Supported Data types

スキーマが宣言されている場合:

Kafka Connect TypeClickHouse TypeSupportedPrimitive
STRINGStringYes
INT8Int8Yes
INT16Int16Yes
INT32Int32Yes
INT64Int64Yes
FLOAT32Float32Yes
FLOAT64Float64Yes
BOOLEANBooleanYes
ARRAYArray(T)No
MAPMap(Primitive, T)No
STRUCTVariant(T1, T2, ...)No
STRUCTTuple(a T1, b T2, ...)No
STRUCTNested(a T1, b T2, ...)No
BYTESStringNo
org.apache.kafka.connect.data.TimeInt64 / DateTime64No
org.apache.kafka.connect.data.TimestampInt32 / Date32No
org.apache.kafka.connect.data.DecimalDecimalNo

スキーマが宣言されていない場合:

レコードはJSONに変換され、JSONEachRow形式でClickHouseに送信されます。

Configuration Recipes

迅速に始めるための一般的な設定レシピをいくつか紹介します。

Basic Configuration

始めるための最も基本的な設定 - Kafka Connectが分散モードで実行されており、localhost:8443でSSLが有効になっているClickHouseサーバーが実行されていることを前提とし、データはスキーマレスのJSONです。

Basic Configuration with Multiple Topics

コネクタは複数のトピックからデータを消費できます。

Basic Configuration with DLQ

Using with different data formats

Avro Schema Support
Protobuf Schema Support

注意:クラスが不足している問題が発生した場合、すべての環境がprotobuf変換器を含むわけではなく、依存関係がバンドルされた別のリリースのjarが必要になることがあります。

JSON Schema Support
String Support

コネクタは、異なるClickHouse形式のString Converterをサポートします:JSONCSV、およびTSV

Logging

ログ記録はKafka Connect Platformによって自動的に提供されます。ログの宛先と形式は、Kafka connectの設定ファイルを介して設定できます。

Confluent Platformを使用している場合は、CLIコマンドを実行することでログを確認できます。

追加の詳細は公式のチュートリアルをチェックしてください。

Monitoring

ClickHouse Kafka Connectは、Java Management Extensions (JMX)を介してランタイムメトリックを報告します。JMXはデフォルトでKafka Connectorで有効になっています。

ClickHouse Connect MBeanName

ClickHouse Kafka Connectは次のメトリックを報告します:

NameTypeDescription
receivedRecordslong受け取ったレコードの総数。
recordProcessingTimelongレコードを統一構造にグループ化して変換するのにかかる合計時間(ナノ秒単位)。
taskProcessingTimelongClickHouseにデータを処理して挿入するのにかかる合計時間(ナノ秒単位)。

Limitations

  • 削除はサポートされていません。
  • バッチサイズはKafka Consumerプロパティから引き継がれます。
  • KeeperMapを使って一度だけ接続している場合、オフセットが変更または巻き戻されると、その特定のトピックのKeeperMapから内容を削除する必要があります。(詳細は以下のトラブルシューティングガイドを参照)

Tuning Performance

「シンクコネクタのバッチサイズを調整したい」と思ったことがあれば、ここがあなたのセクションです。

Connect Fetch vs Connector Poll

Kafka Connect(私たちのシンクコネクタが構築されているフレームワーク)は、バックグラウンドでKafkaトピックからメッセージを取得します(コネクタとは独立しています)。

このプロセスは、fetch.min.bytesfetch.max.bytesを使用して制御できます。fetch.min.bytesは、フレームワークがコネクタに値を渡す前に必要な最小量を設定し(fetch.max.wait.msで設定された時間制限まで)、fetch.max.bytesは上限サイズを設定します。コネクタにより大きなバッチを渡したい場合は、最小フェッチまたは最大待機を増やすというオプションがあります。

この取得したデータは、その後メッセージをポーリングするコネクタクライアントによって消費されます。この際、各ポーリングに対する量はmax.poll.recordsによって制御されます。フェッチはポーリングとは独立していることに注意してください!

これらの設定を調整する際、ユーザーはフェッチサイズがmax.poll.recordsの複数のバッチを生成することを目指すべきです(設定fetch.min.bytesfetch.max.bytesは圧縮データを表していることに注意してください) - そうすることで、各コネクタタスクができるだけ大きなバッチを挿入します。

ClickHouseは、頻繁だが小さなバッチよりも、わずかな遅延でも大きなバッチに最適化されています - バッチが大きいほど、パフォーマンスが良くなります。

詳細については、ConfluentのドキュメントKafkaのドキュメントをご覧ください。

Multiple high throughput topics

コネクタが複数のトピックを購読するように設定されていて、topic2TableMapを使用してトピックをテーブルにマッピングし、挿入時にボトルネックが発生して消費者の遅延が生じている場合、代わりにトピックごとに一つのコネクタを作成することを検討してください。この理由は、現在バッチがすべてのテーブルに対して直列的に挿入されるからです。

トピックごとに一つのコネクタを作成することは、可能な限り速い挿入率を確保するための暫定策です。

Troubleshooting

"State mismatch for topic [someTopic] partition [0]"

これは、KeeperMapに保存されたオフセットがKafkaに保存されたオフセットと異なる場合に発生します。通常、トピックが削除されたか、オフセットが手動で調整されたときに発生します。 これを修正するには、その特定のトピック+パーティションのために保存されている古い値を削除する必要があります。

注意:この調整は一度だけの接続に影響を与える可能性があります。

"What errors will the connector retry?"

現在のところ、焦点は一時的でリトライ可能なエラーの特定にあります。これには次のものが含まれます:

  • ClickHouseException - ClickHouseによってスローされる可能性がある一般的な例外です。 サーバーが過負荷であるときによくスローされ、以下のエラーコードが特に一時的拡張されます:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - ソケットがタイムアウトしたときにスローされます。
  • UnknownHostException - ホストが解決できないときにスローされます。
  • IOException - ネットワークに問題がある場合にスローされます。

"All my data is blank/zeroes"

おそらく、データ内のフィールドがテーブル内のフィールドと一致していません - これは特にCDC(およびDebezium形式)で一般的です。 一般的な解決策の一つは、コネクタ設定にフラット変換を追加することです:

これにより、データがネストされたJSONからフラットなJSONに変換されます(_を区切り文字として使用)。テーブル内のフィールドは「field1_field2_field3」形式に従うことになります(例:「before_id」、「after_id」など)。

"I want to use my Kafka keys in ClickHouse"

Kafkaのキーはデフォルトでは値フィールドに保存されませんが、KeyToValue変換を使用してキーを値フィールドに移動できます(新しい_keyフィールド名の下に):