Confluent HTTP Sink Connector
HTTP Sink Connectorはデータ型に依存しないため、Kafkaスキーマを必要とせず、MapsやArraysなどのClickHouse特有のデータ型をサポートします。この追加の柔軟性は、設定の複雑さが少し増すことを意味します。
以下では、単一のKafkaトピックからメッセージを取得し、行をClickHouseテーブルに挿入するシンプルなインストール手順を説明します。
HTTP ConnectorはConfluent Enterprise Licenseのもとで配布されています。
クイックスタート手順
1. 接続情報を取得する
To connect to ClickHouse with HTTP(S) you need this information:
-
The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.
-
The DATABASE NAME: out of the box, there is a database named
default, use the name of the database that you want to connect to. -
The USERNAME and PASSWORD: out of the box, the username is
default. Use the username appropriate for your use case.
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
以下は、ClickHouseにHTTP(S)で接続するために必要な情報です:
-
HOSTとPORT: 通常、TLSを使用する場合はポートが8443、使用しない場合は8123です。
-
DATABASE NAME: デフォルトでは、
defaultという名前のデータベースがあります。接続したいデータベースの名前を使用します。 -
USERNAMEとPASSWORD: デフォルトでは、ユーザー名は
defaultです。使用ケースに適したユーザー名を使用します。
ClickHouse Cloudサービスの詳細は、ClickHouse Cloudコンソールで確認できます。 接続するサービスを選択し、Connectをクリックしてください:

HTTPSを選択すると、詳細はexample curlコマンドで確認できます。

セルフマネージドのClickHouseを使用している場合は、接続の詳細がClickHouse管理者によって設定されます。
2. Kafka ConnectとHTTP Sink Connectorを実行する
2つのオプションがあります:
-
セルフマネージド: Confluentパッケージをダウンロードし、ローカルにインストールします。コネクタのインストールに関する手順はこちらに記載されています。 confluent-hubインストール方法を使用すると、ローカル設定ファイルが更新されます。
-
Confluent Cloud: KafkaホスティングにConfluent Cloudを利用している場合、HTTP Sinkの完全管理型バージョンが利用可能です。これにはClickHouse環境がConfluent Cloudからアクセス可能である必要があります。
以下の例はConfluent Cloudを使用しています。
3. ClickHouseに宛先テーブルを作成する
接続テストの前に、ClickHouse Cloudでテストテーブルを作成しましょう。このテーブルはKafkaからのデータを受信します:
4. HTTP Sinkを設定する
KafkaトピックとHTTP Sink Connectorのインスタンスを作成します:

HTTP Sink Connectorを設定します:
- 作成したトピック名を指定します
- 認証
HTTP Url- ClickHouse CloudのURLで、指定されたINSERTクエリ<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRowを含みます。注意: クエリはエンコードされる必要があります。Endpoint Authentication type- BASICAuth username- ClickHouseのユーザー名Auth password- ClickHouseのパスワード
このHTTP Urlはエラーが発生しやすいです。問題を避けるためにエスケープが正確であることを確認してください。

- 設定
Input Kafka record value format- ソースデータに依存しますが、ほとんどの場合はJSONまたはAvroです。以下の設定ではJSONを想定しています。advanced configurationsセクションで:HTTP Request Method- POSTに設定しますRequest Body Format- jsonBatch batch size- ClickHouseの推奨に従い、少なくとも1000に設定します。Batch json as array- trueRetry on HTTP codes- 400-500ですが、必要に応じて適応してください(例: ClickHouseの前にHTTPプロキシがある場合があるため)。Maximum Reties- デフォルト(10)は適切ですが、より堅牢なリトライのために調整してください。

5. 接続性をテストする
HTTP Sinkで構成されたトピックにメッセージを作成します

作成したメッセージがClickHouseインスタンスに書き込まれたことを確認してください。
トラブルシューティング
HTTP Sinkがメッセージをバッチしない
HTTP Sinkコネクタは、異なるKafkaヘッダ値を含むメッセージのリクエストをバッチ処理しません。
- Kafkaレコードが同じキーを持っていることを確認してください。
- HTTP API URLにパラメータを追加すると、各レコードがユニークなURLを生成します。このため、追加のURLパラメータを使用している場合、バッチ処理が無効になります。
400 bad request
CANNOT_PARSE_QUOTED_STRING
HTTP SinkがStringカラムにJSONオブジェクトを挿入中に次のメッセージで失敗した場合:
URLに設定するinput_format_json_read_objects_as_strings=1パラメータをエンコードされた文字列SETTINGS%20input_format_json_read_objects_as_strings%3D1として設定します。
GitHubデータセットをロードする (オプション)
この例では、GithubデータセットのArrayフィールドを保持しています。例では空のgithubトピックがあると仮定し、メッセージ挿入にはkcatを使用します。
1. 設定を準備する
お使いのインストールタイプに関連するConnectのセットアップについてはこれらの手順に従ってください。スタンドアロンと分散クラスタの違いに注意してください。Confluent Cloudを使用している場合、分散セットアップが関連します。
最も重要なパラメータはhttp.api.urlです。ClickHouseのHTTPインターフェースは、INSERT文をURLのパラメータとしてエンコードする必要があります。これには形式(この場合はJSONEachRow)と対象データベースが含まれます。形式はKafkaデータと一致していなければならず、HTTPペイロード内で文字列に変換されます。これらのパラメータはURLエスケープされなければなりません。Githubデータセット用のこの形式の例(ClickHouseをローカルで実行していると仮定)は以下に示されています:
HTTP SinkをClickHouseで使用するために関連する他の追加パラメータは次のとおりです。完全なパラメータリストはこちらで確認できます:
request.method- POSTに設定retry.on.status.codes- エラーコード400-500でリトライするように設定。データの予想されるエラーに基づいて詳細設定してください。request.body.format- ほとんどの場合、これはJSONになります。auth.type- ClickHouseにセキュリティがある場合、BASICに設定します。他のClickHouse対応の認証メカニズムは現在サポートされていません。ssl.enabled- SSLを使用する場合はtrueに設定。connection.user- ClickHouseのユーザー名。connection.password- ClickHouseのパスワード。batch.max.size- 単一バッチで送信する行数。適切に大きな数値に設定してください。ClickHouseの推奨に従い、1000は最小値と見なされるべきです。tasks.max- HTTP Sinkコネクタは1つ以上のタスクを実行できます。これを利用してパフォーマンスを向上させることができます。バッチサイズと共に、パフォーマンスを向上させる主な手段となります。key.converter- キーのタイプに応じて設定。value.converter- トピックのデータのタイプに基づいて設定。このデータはスキーマを必要としません。ここでの形式は、http.api.urlパラメータで指定されたFORMATと一致している必要があります。ここで最も簡単なのはJSONを使用し、org.apache.kafka.connect.json.JsonConverterコンバータを利用することです。値を文字列として扱うことも可能で、その場合はorg.apache.kafka.connect.storage.StringConverterコンバータを使用しますが、その場合は挿入文で関数を使用して値を抽出する必要があります。Avro形式もClickHouseでサポートされています、io.confluent.connect.avro.AvroConverterコンバータを使用する場合。
プロキシ、リトライ、および高度なSSLの設定方法を含む完全な設定リストはこちらで確認できます。
Githubサンプルデータ用の設定ファイルの例はこちらにあります。Connectがスタンドアロンモードで実行され、KafkaがConfluent Cloudにホストされていると仮定しています。
2. ClickHouseテーブルを作成する
テーブルが作成されていることを確認します。標準のMergeTreeを使用した最小のgithubデータセットの例は以下に示されています。
3. Kafkaにデータを追加する
Kafkaにメッセージを挿入します。以下ではkcatを使用して10,000メッセージを挿入します。
ターゲットテーブル"Github"を単純に読み取ることで、データの挿入が確認できます。