メインコンテンツまでスキップ
メインコンテンツまでスキップ

ClickHouse Kafka Connect Sink

注記

ヘルプが必要な場合は、リポジトリに問題を報告するか、ClickHouseパブリックスラックで質問してください。

ClickHouse Kafka Connect Sink は、KafkaトピックからClickHouseテーブルへのデータを配信するKafkaコネクタです。

License

Kafka Connector Sinkは、Apache 2.0 Licenseの下で配布されています。

Requirements for the environment

Kafka Connectフレームワークv2.7以降を環境にインストールする必要があります。

Version compatibility matrix

ClickHouse Kafka Connect versionClickHouse versionKafka ConnectConfluent platform
1.0.0> 23.3> 2.7> 6.1

Main features

  • 箱から出してすぐに使える正確な一度きりのセマンティクスを提供します。これは、コネクタによって状態ストアとして使用される新しいClickHouseコア機能であるKeeperMapにより実現され、ミニマリズムのアーキテクチャを可能にします。
  • サードパーティの状態ストアのサポート:現在はメモリ内がデフォルトですが、KeeperMap(Redisが近いうちに追加される予定)を使用できます。
  • コア統合:ClickHouseによって構築、保守、サポートされています。
  • ClickHouse Cloudに対して継続的にテストされています。
  • 宣言されたスキーマとスキーマレスでのデータ挿入。
  • ClickHouseのすべてのデータ型をサポートしています。

Installation instructions

Gather your connection details

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS, and the details are available in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


以下は、ClickHouseにHTTP(S)で接続するために必要な情報です:

  • HOSTとPORT: 通常、TLSを使用する場合はポートが8443、使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用します。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。使用ケースに適したユーザー名を使用します。

ClickHouse Cloudサービスの詳細は、ClickHouse Cloudコンソールで確認できます。 接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細はexample curlコマンドで確認できます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合は、接続の詳細がClickHouse管理者によって設定されます。

General installation instructions

コネクタは、プラグインを実行するために必要なすべてのクラスファイルを含む単一のJARファイルとして配布されます。

プラグインをインストールするには、次の手順に従います。

  • リリースページからConnector JARファイルを含むzipアーカイブをダウンロードします。
  • ZIPファイルの内容を抽出し、希望の場所にコピーします。
  • Confluent Platformがプラグインを見つけられるように、Connectプロパティファイルのplugin.path構成にプラグインディレクトリのパスを追加します。
  • トピック名、ClickHouseインスタンスのホスト名、およびパスワードをconfigに提供します。
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
jdbcConnectionProperties=?sslmode=STRICT
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false
  • Confluent Platformを再起動します。
  • Confluent Platformを使用している場合は、Confluent Control Center UIにログインして、ClickHouse Sinkが利用可能なコネクタのリストにあることを確認します。

Configuration options

ClickHouse SinkをClickHouseサーバに接続するには、次を提供する必要があります。

  • 接続詳細:ホスト名(必須)およびポート(オプション)
  • ユーザー資格情報:パスワード(必須)およびユーザー名(オプション)
  • コネクタークラス:com.clickhouse.kafka.connect.ClickHouseSinkConnector必須
  • トピックまたはtopics.regex:ポーリングするKafkaトピック - トピック名はテーブル名と一致する必要があります(必須
  • キーおよび値コンバーター:トピック内のデータタイプに基づいて設定します。ワーカー構成で未定義の場合は必須です。

完全な構成オプションの表:

Property NameDescriptionDefault Value
hostname (Required)サーバのホスト名またはIPアドレスN/A
portClickHouseポート - デフォルトは8443(クラウドのHTTPS用)ですが、HTTP(セルフホストのデフォルト)の場合は8123である必要があります8443
sslClickHouseへのssl接続を有効にしますtrue
jdbcConnectionPropertiesClickhouseに接続する際の接続プロパティ。?から始まり、param=valueの間は&で結合される必要があります""
usernameClickHouseデータベース用のユーザー名default
password (Required)ClickHouseデータベース用のパスワードN/A
databaseClickHouseデータベース名default
connector.class (Required)コネクタークラス(明示的に設定し、デフォルト値を保持)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxコネクタタスクの数"1"
errors.retry.timeoutClickHouse JDBCリトライタイムアウト"60"
exactlyOnceExactly Once有効"false"
topics (Required)ポーリングするKafkaトピック - トピック名はテーブル名と一致する必要があります""
key.converter (Required* - See Description)キーのタイプに応じて設定します。キーを渡す場合はここで必須(ワーカー構成で定義されていない場合)。"org.apache.kafka.connect.storage.StringConverter"
value.converter (Required* - See Description)トピックのデータタイプに基づいて設定します。サポートされている形式:- JSON、String、AvroまたはProtobuf形式。ワーカー構成で定義されていない場合はここで必須です。"org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableコネクタ値コンバータのスキーマサポート"false"
errors.toleranceコネクタエラー耐性。サポートされる値:none, all"none"
errors.deadletterqueue.topic.name設定されている場合(errors.tolerance=allと共に)、失敗したバッチ用のDLQが使用されます(トラブルシューティングを参照)""
errors.deadletterqueue.context.headers.enableDLQ用の追加ヘッダーを追加します""
clickhouseSettingsクリックハウス設定のカンマ区切りリスト(例: "insert_quorum=2、etc...")""
topic2TableMapトピック名をテーブル名にマッピングするカンマ区切りリスト(例: "topic1=table1、topic2=table2等...")""
tableRefreshIntervalテーブル定義キャッシュを更新するための時間(秒単位)0
keeperOnClusterセルフホストのインスタンス用にON CLUSTERパラメーターを構成できるようにします(例: ON CLUSTER clusterNameInConfigFileDefinition)正確に一度きりのconnect_stateテーブル用(分散DDLクエリを参照)""
bypassRowBinaryスキーマベースのデータ(Avro、Protobufなど)用のRowBinaryおよびRowBinaryWithDefaultsの使用を無効にすることを許可します - データに欠落するカラムがあり、Nullable/Defaultが許可されない場合にのみ使用するべきです。"false"
dateTimeFormatsDateTime64スキーマフィールドの解析用の日付時間形式、;で区切ります(例: someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。""
tolerateStateMismatchコネクタが「現在の」処理後に保存されたオフセットよりも「早い」レコードをドロップできるようにします(例:オフセット5が送信され、オフセット250が最後の記録されたオフセットだった場合)"false"
ignorePartitionsWhenBatching挿入のためにメッセージを収集する際にパーティションを無視します(exactlyOncefalseの場合のみ)。パフォーマンスノート:コネクタタスクが多いほど、タスクごとに割り当てられたkafkaパーティションが少なくなります - これにより効果が薄れる可能性があります。"false"

Target tables

ClickHouse Connect SinkはKafkaトピックからメッセージを読み取り、適切なテーブルに書き込みます。ClickHouse Connect Sinkは既存のテーブルにデータを書き込みます。データを挿入する前に、ClickHouseに適切なスキーマを持つターゲットテーブルが作成されていることを確認してください。

各トピックにはClickHouse内に専用のターゲットテーブルが必要です。ターゲットテーブル名はソーストピック名と一致する必要があります。

Pre-processing

ClickHouse Kafka Connect Sinkに送信される前にアウトバウンドメッセージを変換する必要がある場合は、Kafka Connect Transformationsを使用してください。

Supported data types

スキーマが宣言されている場合:

Kafka Connect TypeClickHouse TypeSupportedPrimitive
STRINGStringYes
STRINGJSON. See below (1)Yes
INT8Int8Yes
INT16Int16Yes
INT32Int32Yes
INT64Int64Yes
FLOAT32Float32Yes
FLOAT64Float64Yes
BOOLEANBooleanYes
ARRAYArray(T)No
MAPMap(Primitive, T)No
STRUCTVariant(T1, T2, ...)No
STRUCTTuple(a T1, b T2, ...)No
STRUCTNested(a T1, b T2, ...)No
STRUCTJSON. See below (1), (2)No
BYTESStringNo
org.apache.kafka.connect.data.TimeInt64 / DateTime64No
org.apache.kafka.connect.data.TimestampInt32 / Date32No
org.apache.kafka.connect.data.DecimalDecimalNo
  • (1) - JSONは、ClickHouse設定でinput_format_binary_read_json_as_string=1が設定されている場合にのみサポートされます。これはRowBinary形式ファミリーにのみ機能し、この設定はすべてのカラムに影響するため、すべてのカラムは文字列にする必要があります。この場合、コネクタはSTRUCTをJSON文字列に変換します。

  • (2) - structにoneofのようなユニオンがある場合は、フィールド名にプレフィックス/サフィックスを追加しないようにコンバータを構成する必要があります。ProtobufConverterのためのgenerate.index.for.unions=falseという設定があります(詳細はこちらを参照)。

スキーマが宣言されていない場合:

レコードはJSONに変換され、JSONEachRow形式でClickHouseに値として送信されます。

Configuration recipes

これらは、迅速に始めるための一般的な構成レシピです。

Basic configuration

開始するための最も基本的な構成 - Kafka Connectが分散モードで実行されており、SSLが有効な状態でlocalhost:8443でClickHouseサーバーが実行されていることを前提としています。データはスキーマレスのJSONです。

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "consumer.override.max.poll.records": "5000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "database": "default",
    "errors.retry.timeout": "60",
    "exactlyOnce": "false",
    "hostname": "localhost",
    "port": "8443",
    "ssl": "true",
    "jdbcConnectionProperties": "?ssl=true&sslmode=strict",
    "username": "default",
    "password": "<PASSWORD>",
    "topics": "<TOPIC_NAME>",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "clickhouseSettings": ""
  }
}

Basic configuration with multiple topics

コネクタは複数のトピックからデータを取得できます。

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "topics": "SAMPLE_TOPIC, ANOTHER_TOPIC, YET_ANOTHER_TOPIC",
    ...
  }
}

Basic configuration with DLQ

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "<DLQ_TOPIC>",
    "errors.deadletterqueue.context.headers.enable": "true",
  }
}

Using with different data formats

Avro schema support
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
Protobuf schema support
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}

注意:クラスが見つからない問題が発生した場合、すべての環境にprotobufコンバータが付属しているわけではなく、依存関係がバンドルされたjarの代替リリースが必要になることがあります。

JSON schema support
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  }
}
String support

コネクタは異なるClickHouse形式のString Converterをサポートしています:JSONCSV、およびTSV

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "customInsertFormat": "true",
    "insertFormat": "CSV"
  }
}

Logging

ログはKafka Connect Platformによって自動的に提供されます。 ログの出力先と形式は、Kafka connectの設定ファイルを介して構成できます。

Confluent Platformを使用している場合、CLIコマンドを実行することでログを確認できます:

confluent local services connect log

詳細については、公式のチュートリアルを確認してください。

Monitoring

ClickHouse Kafka Connectは、Java Management Extensions (JMX)を介してランタイムメトリクスを報告します。JMXは既定でKafka Connectorで有効になっています。

ClickHouse Connect MBeanName

com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id}

ClickHouse Kafka Connectは以下のメトリクスを報告します:

NameTypeDescription
receivedRecordslong受け取ったレコードの総数。
recordProcessingTimelongレコードを統合し、統一された構造に変換するために消費された総時間(ナノ秒単位)。
taskProcessingTimelongClickHouseにデータを処理して挿入するために消費された総時間(ナノ秒単位)。

Limitations

  • 削除はサポートされていません。
  • バッチサイズはKafka Consumerプロパティから引き継がれます。
  • KeeperMapを正確に一度きりで使用し、オフセットが変更または巻き戻された場合、その特定のトピックに対してKeeperMapから内容を削除する必要があります。(詳細は以下のトラブルシューティングガイドを参照)

Tuning performance

「Sinkコネクタのバッチサイズを調整したい」と考えたことがあるなら、このセクションがあなたのためのものです。

Connect fetch vs connector poll

Kafka Connect(私たちのSinkコネクタが構築されるフレームワーク)は、バックグラウンドでKafkaトピックからメッセージを取得します(コネクタとは独立しています)。

このプロセスは、fetch.min.bytesfetch.max.bytesを使用して制御できます - fetch.min.bytesはフレームワークが値をコネクタに渡す前に必要な最小量を設定し(fetch.max.wait.msによって設定された時間制限まで)、fetch.max.bytesはサイズの上限を設定します。コネクタに大きなバッチを渡す場合、最小取得量または最大待機量を増やして大きなデータバンドルを構築するオプションがあります。

この取得したデータは、メッセージをポーリングするコネクタクライアントによって消費されます。各ポーリングの量はmax.poll.recordsによって制御されますが、取得はポーリングとは独立しています!

これらの設定を調整する際は、ユーザーは取得サイズがmax.poll.recordsの複数のバッチを生成することを目指すべきです(また、設定fetch.min.bytesfetch.max.bytesが圧縮データを表すことを考慮してください) - そうすれば、各コネクタタスクはできるだけ大きなバッチを挿入できます。

ClickHouseは、頻繁で小さなバッチよりも、わずかな遅延で大きなバッチに最適化されています - バッチが大きいほど、より良い結果が得られます。

consumer.max.poll.records=5000
consumer.max.partition.fetch.bytes=5242880

詳細は、ConfluentのドキュメントKafkaのドキュメントを参照してください。

Multiple high throughput topics

コネクタが複数のトピックにサブスクライブするように構成されており、topic2TableMapを使用してトピックをテーブルにマッピングしている場合、挿入時にボトルネックが発生して消費者の遅延が発生している場合、代わりにトピックごとに1つのコネクタを作成することを検討してください。これが発生する主な理由は、現在、バッチがすべてのテーブルに直列的に挿入されるからです。

トピックごとに1つのコネクタを作成することは、可能な限り高速な挿入率を確保するための回避策です。

Troubleshooting

"State mismatch for topic [someTopic] partition [0]"

これは、KeeperMapに保存されているオフセットがKafkaに保存されているオフセットと異なる場合に発生します。通常、トピックが削除されたり、オフセットが手動で調整されたりすると発生します。 これを修正するには、その特定のトピック+パーティションに対して保存されている古い値を削除する必要があります。

注:この調整は正確に一度きりの影響を及ぼす可能性があります。

"What errors will the connector retry?"

現在のところ、フォーカスは一時的で再試行可能なエラーの特定にあります。これには以下が含まれます:

  • ClickHouseException - これはClickHouseによってスローされる一般的な例外です。 通常、サーバーが過負荷であるときにスローされ、以下のエラーコードが特に一時的と見なされます:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - ソケットがタイムアウトした場合にスローされます。
  • UnknownHostException - ホストが解決できない場合にスローされます。
  • IOException - ネットワークに問題がある場合にスローされます。

"All my data is blank/zeroes"

おそらくデータ内のフィールドがテーブル内のフィールドと一致していないためです - これは特にCDC(およびDebezium形式)で一般的です。 一般的な解決策の1つは、コネクタ構成にフラット変換を追加することです:

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_

これにより、データがネストされたJSONからフラットなJSONに変換されます(_を区切り文字として使用)。テーブル内のフィールドは「field1_field2_field3」形式(例:「before_id」、「after_id」など)になります。

"I want to use my Kafka keys in ClickHouse"

Kafkaのキーはデフォルトでは値フィールドには保存されませんが、KeyToValue変換を使用してキーを値フィールドに移動することができます(新しい_keyフィールド名の下に):

transforms=keyToValue
transforms.keyToValue.type=com.clickhouse.kafka.connect.transforms.KeyToValue
transforms.keyToValue.field=_key