メインコンテンツまでスキップ
メインコンテンツまでスキップ

ClickHouse Kafka Connect Sink

注記

お手伝いが必要な場合は、リポジトリに問題を報告するか、ClickHouseの公共Slackで質問をしてください。

ClickHouse Kafka Connect Sinkは、KafkaトピックからClickHouseテーブルにデータを提供するKafkaコネクタです。

License

Kafka Connector Sinkは、Apache 2.0 Licenseの下で配布されています。

Requirements for the environment

Kafka Connectフレームワークv2.7以降が環境にインストールされている必要があります。

Version compatibility matrix

ClickHouse Kafka Connect versionClickHouse versionKafka ConnectConfluent platform
1.0.0> 23.3> 2.7> 6.1

Main Features

  • 高度な一貫性を実現する機能が組み込まれています。これは、コネクタによって状態ストアとして使用される新しいClickHouseコア機能KeeperMapによって提供され、ミニマリストアーキテクチャを実現します。
  • 3rd-party状態ストアへのサポート: 現在はIn-memoryがデフォルトですが、KeeperMapを使用できます(Redisを近日中に追加予定)。
  • コア統合: ClickHouseによって構築、維持、サポートされています。
  • ClickHouse Cloudに対して継続的にテストされています。
  • 宣言されたスキーマとスキーマレスのデータ挿入。
  • ClickHouseのすべてのデータ型をサポート。

Installation instructions

Gather your connection details

ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOST と PORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAME と PASSWORD: デフォルトでは、ユーザー名はdefaultです。あなたのユースケースに適したユーザー名を使用してください。

あなたのClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細は例のcurlコマンドに表示されます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合、接続の詳細はあなたのClickHouse管理者によって設定されています。

General Installation Instructions

コネクタは、プラグインを実行するために必要なすべてのクラスファイルを含む単一のJARファイルとして配布されます。

プラグインをインストールするには、次の手順に従ってください。

  • ReleasesページからコネクタJARファイルを含むzipアーカイブをダウンロードします。
  • ZIPファイルの内容を抽出し、希望の場所にコピーします。
  • Confluent Platformがプラグインを見つけられるように、Connectプロパティファイルのplugin.path構成にプラグインディレクトリのパスを追加します。
  • configにトピック名、ClickHouseインスタンスのホスト名、およびパスワードを提供します。
  • Confluent Platformを再起動します。
  • Confluent Platformを使用している場合、Confluent Control Center UIにログインし、ClickHouse Sinkが利用可能なコネクタのリストに表示されることを確認します。

Configuration options

ClickHouse SinkをClickHouseサーバーに接続するには、次の情報を提供する必要があります。

  • 接続の詳細:ホスト名(必須)およびポート(オプション)
  • ユーザー認証情報:パスワード(必須)およびユーザー名(オプション)
  • コネクタクラス:com.clickhouse.kafka.connect.ClickHouseSinkConnector必須
  • topicsまたはtopics.regex:ポーリングするKafkaトピック - トピック名はテーブル名と一致する必要があります(必須
  • キーおよび値のコンバータ:トピックのデータタイプに基づいて設定します。ワーカ構成で未定義の場合は必須です。

設定オプションの完全な表:

Property NameDescriptionDefault Value
hostname (Required)サーバーのホスト名またはIPアドレスN/A
portClickHouseポート - デフォルトは8443(クラウドでのHTTPS用)ですが、HTTP(セルフホスティングのデフォルト)の場合は8123を指定する必要があります。8443
sslClickHouseへのssl接続を有効にしますtrue
jdbcConnectionPropertiesClickHouseに接続する際の接続プロパティ。?で始まり、param=valueの間は&で結合する必要があります。""
usernameClickHouseデータベースのユーザー名default
password (Required)ClickHouseデータベースのパスワードN/A
databaseClickHouseデータベース名default
connector.class (Required)コネクタクラス(明示的に設定し、デフォルト値を保持)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxコネクタタスクの最大数"1"
errors.retry.timeoutClickHouse JDBCリトライタイムアウト"60"
exactlyOnce一貫性のある接続を有効にします"false"
topics (Required)ポーリングするKafkaトピック - トピック名はテーブル名と一致する必要があります。""
key.converter (Required* - See Description)キーのタイプに基づいて設定します。キーを送信する場合(ワーカ構成で未定義の場合)はここで必須です。"org.apache.kafka.connect.storage.StringConverter"
value.converter (Required* - See Description)トピックのデータタイプに基づいて設定します。サポートされているフォーマット:- JSON、String、Avro、Protobuf。この設定は、ワーカ構成で定義されていない場合にここで必要です。"org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableコネクタ値コンバータスキーマサポート"false"
errors.toleranceコネクタエラーの耐久性。サポートされている値:none、all"none"
errors.deadletterqueue.topic.name設定されている場合(errors.tolerance=all)、失敗したバッチのためにDLQが使用されます(トラブルシューティングを参照)。""
errors.deadletterqueue.context.headers.enableDLQの追加ヘッダーを追加します。""
clickhouseSettingsClickHouseの設定のカンマ区切りリスト(例: "insert_quorum=2, etc...")""
topic2TableMapトピック名とテーブル名をマッピングするカンマ区切りリスト(例: "topic1=table1, topic2=table2, etc...")""
tableRefreshIntervalテーブル定義キャッシュをリフレッシュする時間(秒単位)0
keeperOnClusterセルフホストインスタンス用のON CLUSTERパラメータの構成を許可します(例:ON CLUSTER clusterNameInConfigFileDefinition)一貫性のあるconnect_stateテーブルのため(分散DDLクエリを参照)""
bypassRowBinaryスキーマに基づくデータ(Avro、Protobufなど)でRowBinaryおよびRowBinaryWithDefaultsの使用を無効にします。データにコラムが欠けている場合にのみ使用すべきであり、Nullable/Defaultが受け入れられないときに使用します。"false"
dateTimeFormatsDateTime64スキーマフィールドを解析するための日付時刻フォーマット、;で区切っています(例:someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。""
tolerateStateMismatchコネクタが"現在の"オフセットが格納される前に、"より早い"レコードを削除できるようにします(例:オフセット5が送信され、オフセット250が最後に記録されたオフセットである場合)。"false"

Target Tables

ClickHouse Connect Sinkは、Kafkaトピックからメッセージを読み込み、適切なテーブルに書き込みます。ClickHouse Connect Sinkは、既存のテーブルにデータを書き込みます。データを挿入する前に、ClickHouseに適切なスキーマのターゲットテーブルが作成されていることを確認してください。

各トピックには、ClickHouseに専用のターゲットテーブルが必要です。ターゲットテーブル名は、ソーストピック名と一致する必要があります。

Pre-processing

ClickHouse Kafka Connect Sinkに送信される前に、送信メッセージを変換する必要がある場合は、Kafka Connect Transformationsを使用してください。

Supported Data types

スキーマが宣言された場合:

Kafka Connect TypeClickHouse TypeSupportedPrimitive
STRINGStringYes
INT8Int8Yes
INT16Int16Yes
INT32Int32Yes
INT64Int64Yes
FLOAT32Float32Yes
FLOAT64Float64Yes
BOOLEANBooleanYes
ARRAYArray(T)No
MAPMap(Primitive, T)No
STRUCTVariant(T1, T2, …)No
STRUCTTuple(a T1, b T2, …)No
STRUCTNested(a T1, b T2, …)No
BYTESStringNo
org.apache.kafka.connect.data.TimeInt64 / DateTime64No
org.apache.kafka.connect.data.TimestampInt32 / Date32No
org.apache.kafka.connect.data.DecimalDecimalNo

スキーマが宣言されていない場合:

レコードはJSONに変換され、JSONEachRow形式でClickHouseに値として送信されます。

Configuration Recipes

これらは、迅速に始めるための一般的な設定レシピです。

Basic Configuration

最も基本的な設定 - Kafka Connectを分散モードで実行し、SSLが有効になっているlocalhost:8443でClickHouseサーバーが実行されていることを前提とし、データはスキーマレスJSONです。

Basic Configuration with Multiple Topics

コネクタは複数のトピックからデータを取得できます。

Basic Configuration with DLQ

Using with different data formats

Avro Schema Support
Protobuf Schema Support

注意: クラスが見つからない問題が発生した場合、一部の環境にはprotobufコンバータが含まれておらず、依存関係とバンドルされた別リリースのjarが必要になることがあります。

JSON Schema Support
String Support

コネクタは、異なるClickHouseフォーマットでのString Converterをサポートしています:JSONCSV、およびTSV

Logging

ロギングは自動的にKafka Connect Platformによって提供されます。 ロギングの宛先と形式は、Kafka connectの設定ファイルを介して構成することができます。

Confluent Platformを使用している場合、ログはCLIコマンドを実行することで表示できます。

詳細については、公式のチュートリアルをご覧ください。

Monitoring

ClickHouse Kafka Connectは、Java Management Extensions (JMX)を介してランタイムメトリクスを報告します。JMXはデフォルトでKafka Connectorに有効になっています。

ClickHouse ConnectのMBeanName:

ClickHouse Kafka Connectは、次のメトリクスを報告します。

NameTypeDescription
receivedRecordslong受信したレコードの総数。
recordProcessingTimelongレコードを統合構造にグループ化し変換するのに要した合計時間(ナノ秒単位)。
taskProcessingTimelongClickHouseにデータを処理し挿入するのに要した合計時間(ナノ秒単位)。

Limitations

  • 削除はサポートされていません。
  • バッチサイズはKafka Consumerプロパティから引き継がれます。
  • KeeperMapで一貫性のある状態が変更された場合は、特定のトピックの内容をKeeperMapから削除する必要があります。(詳細は下のトラブルシューティングガイドを参照)

Tuning Performance

「Sinkコネクタのバッチサイズを調整したい」と考えたことがあるなら、このセクションがあなたのためです。

Connect Fetch vs Connector Poll

Kafka Connect(私たちのSinkコネクタが構築されているフレームワーク)は、バックグラウンドでKafkaトピックからメッセージを取得します(コネクタとは独立しています)。

このプロセスは、fetch.min.bytesfetch.max.bytesを使用して制御できます。fetch.min.bytesはフレームワークがコネクタに値を渡す前に必要な最小量を設定し(fetch.max.wait.msで設定された時間制限まで)、fetch.max.bytesは上限サイズを設定します。コネクタにより大きなバッチを渡したい場合は、最小フェッチまたは最大待機を増やすことがオプションになるかもしれません。

取得されたデータは、メッセージをポーリングするコネクタクライアントによって消費されます。各ポーリングの量はmax.poll.recordsで制御されますが、フェッチはポーリングとは独立しています。

これらの設定を調整する際には、ユーザーは取得サイズが複数のmax.poll.recordsバッチを生成するように目指すべきです(設定fetch.min.bytesfetch.max.bytesは圧縮されたデータを表すことを忘れないでください) - そうすれば、各コネクタタスクは可能な限り大きなバッチを挿入します。

ClickHouseは、頻繁ではなく小さなバッチよりも少し遅れて大きなバッチに最適化されています - バッチが大きいほど良いです。

詳細については、ConfluentのドキュメンテーションKafkaのドキュメンテーションを参照してください。

Multiple high throughput topics

コネクタが複数トピックをサブスクライブするように構成されている場合、topics2TableMapを使用してトピックをテーブルにマッピングしており、挿入でボトルネックを経験している場合は、トピックごとに1つのコネクタを作成することを検討してください。この現象が発生する主な理由は、現在バッチがすべてのテーブルに直列で挿入されるからです。

トピックごとに1つのコネクタを作成することは、可能な限り迅速な挿入率を確保するためのワークアラウンドです。

Troubleshooting

"State mismatch for topic [someTopic] partition [0]"

これは、KeeperMapに保存されたオフセットがKafkaに保存されたオフセットと異なる場合に発生します。通常はトピックが削除されたり、オフセットが手動で調整されたときです。 これを修正するには、指定のトピックとパーティションに対して保存された古い値を削除する必要があります。

注意: この調整は一貫性のある影響を与える可能性があります。

"What errors will the connector retry?"

現在のところ、焦点は一時的でリトライ可能なエラーの特定にあります。含まれるエラーは以下の通りです:

  • ClickHouseException - ClickHouseから投げられる可能性のあるジェネリックな例外です。通常、サーバーが過負荷の時に発生し、以下のエラーコードが特に一時的とみなされます:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - ソケットがタイムアウトした場合に発生します。
  • UnknownHostException - ホストを解決できない場合に発生します。
  • IOException - ネットワークに問題がある場合に発生します。

"All my data is blank/zeroes"

おそらく、あなたのデータのフィールドがテーブルのフィールドと一致していません - これはCDC(およびDebeziumフォーマット)で特に一般的です。 一般的な解決策は、コネクタ設定にフラット変換を追加することです:

これにより、データがネストされたJSONからフラットなJSONに変換されます(_を区切り文字として使用)。テーブルのフィールドは「field1_field2_field3」形式(例:「before_id」、「after_id」など)になります。

"I want to use my Kafka keys in ClickHouse"

Kafkaのキーはデフォルトでは値フィールドに保存されていませんが、KeyToValue変換を使用して、キーを値フィールドに移動させることができます(新しい_keyフィールド名の下に):