メインコンテンツまでスキップ
メインコンテンツまでスキップ

Confluent HTTP Sink Connector

HTTP Sink Connectorはデータ型に依存せず、Kafkaスキーマを必要とせず、MapsやArraysといったClickHouse特有のデータ型をサポートしています。この追加の柔軟性は、設定の複雑さをわずかに増加させます。

以下では、単一のKafkaトピックからメッセージを取得し、ClickHouseテーブルに行を挿入するシンプルなインストール手順を説明します。

注記

HTTP ConnectorはConfluent Enterprise Licenseの下で配布されています。

クイックスタート手順

1. 接続情報を収集する

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。ご利用のケースに適したユーザー名を使用してください。

ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurlコマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。

2. Kafka ConnectとHTTP Sink Connectorを実行する

2つのオプションがあります:

  • セルフマネージド: Confluentパッケージをダウンロードし、ローカルにインストールします。コネクタのインストールに関する手順はこちらに記載されています。 confluent-hubインストール方式を使用する場合、ローカルの設定ファイルが更新されます。

  • Confluent Cloud: Confluent Cloudを使用してKafkaをホスティングしているユーザー向けに完全に管理されたHTTP Sinkのバージョンが利用可能です。これには、ClickHouse環境へのConfluent Cloudからのアクセスが必要です。

注記

次の例はConfluent Cloudを使用しています。

3. ClickHouseに宛先テーブルを作成する

接続テストの前に、ClickHouse Cloudにテストテーブルを作成します。このテーブルはKafkaからデータを受信します:

4. HTTP Sinkを構成する

KafkaトピックとHTTP Sink Connectorのインスタンスを作成します:


HTTP Sink Connectorを構成します:

  • 作成したトピック名を指定する
  • 認証
    • HTTP Url - INSERTクエリが指定されたClickHouse CloudのURL <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow注意: クエリはエンコードする必要があります。
    • Endpoint Authentication type - BASIC
    • Auth username - ClickHouseのユーザー名
    • Auth password - ClickHouseのパスワード
注記

このHTTP Urlはエラーが発生しやすいです。問題を避けるためにエスケープが正確であることを確認してください。


  • 設定
    • Input Kafka record value format - ソースデータに応じて異なりますが、ほとんどの場合はJSONまたはAvroです。以下の設定ではJSONを想定しています。
    • advanced configurationsセクションで:
      • HTTP Request Method - POSTに設定
      • Request Body Format - json
      • Batch batch size - ClickHouseの推奨に従って、少なくとも1000に設定します。
      • Batch json as array - true
      • Retry on HTTP codes - 400-500ですが、必要に応じて調整してください。たとえば、ClickHouseの前にHTTPプロキシがある場合は変わる可能性があります。
      • Maximum Reties - デフォルト(10)は適切ですが、より堅牢な再試行のために調整してください。

5. 接続をテストする

HTTP Sinkによって構成されたトピックにメッセージを作成します


作成したメッセージがClickHouseインスタンスに書き込まれていることを確認します。

トラブルシューティング

HTTP Sinkがメッセージをバッチ処理しない

Sink documentationによると:

HTTP Sinkコネクタは、Kafkaヘッダー値が異なるメッセージのリクエストをバッチ処理しません。

  1. Kafkaレコードが同じキーを持っているか確認します。
  2. HTTP API URLにパラメータを追加すると、各レコードがユニークなURLを生成する可能性があります。このため、追加のURLパラメータを使用する場合はバッチ処理が無効になります。

400 Bad Request

CANNOT_PARSE_QUOTED_STRING

HTTP SinkがStringカラムにJSONオブジェクトを挿入する際に次のメッセージで失敗した場合:

URLにinput_format_json_read_objects_as_strings=1設定をエンコードされた文字列SETTINGS%20input_format_json_read_objects_as_strings%3D1として追加します。

GitHubデータセットをロードする(オプション)

この例ではGitHubデータセットのArrayフィールドが保持されることに注意してください。例では空のgithubトピックがあると仮定し、kcatを使用してKafkaへのメッセージ挿入を行います。

1. 設定を準備する

自分のインストールタイプに関連するConnectの設定についてはこちらの手順に従ってください。スタンドアロンと分散クラスターの違いに注意します。Confluent Cloudを使用する場合、分散設定が関連します。

最も重要なパラメータはhttp.api.urlです。ClickHouseのHTTPインターフェースは、INSERT文をURLのパラメータとしてエンコードする必要があります。これはフォーマット(この場合はJSONEachRow)とターゲットデータベースを含む必要があります。形式はKafkaデータと一致し、HTTPペイロード内で文字列に変換されます。これらのパラメータはURLエスケープする必要があります。GitHubデータセットのこの形式の例(ClickHouseをローカルで実行していると仮定)は以下です:

HTTP SinkをClickHouseと使用する際に関連する追加パラメータは次のとおりです。完全なパラメータリストはこちらで見つけることができます:

  • request.method - POSTに設定
  • retry.on.status.codes - 任意のエラーコードで再試行するために400-500に設定。データに期待されるエラーに基づいて調整してください。
  • request.body.format - ほとんどの場合、これはJSONになります。
  • auth.type - ClickHouseでセキュリティを使用する場合はBASICに設定。その他のClickHouse互換の認証メカニズムは現在サポートされていません。
  • ssl.enabled - SSLを使用している場合はtrueに設定。
  • connection.user - ClickHouseのユーザー名。
  • connection.password - ClickHouseのパスワード。
  • batch.max.size - 単一のバッチで送信する行の数。適切に大きな数に設定されていることを確認してください。ClickHouseの推奨事項では、1000の値を最小限として検討する必要があります。
  • tasks.max - HTTP Sinkコネクタは1つ以上のタスクを実行することをサポートしています。これを使用してパフォーマンスを向上させることができます。バッチサイズとともに、これが主なパフォーマンス向上手段を表します。
  • key.converter - キーの型に応じて設定。
  • value.converter - トピックのデータ型に基づいて設定。このデータはスキーマを必要としません。ここでの形式は、http.api.urlパラメータに指定されたFORMATと一致している必要があります。最もシンプルな方法はJSONとorg.apache.kafka.connect.json.JsonConverterコンバータを使用することです。値を文字列として扱うことも可能で、org.apache.kafka.connect.storage.StringConverterを介して行うことができますが、これには関数を使用して挿入文で値を抽出する必要があります。AvroフォーマットもClickHouseでサポートされており、io.confluent.connect.avro.AvroConverterコンバータを使用する場合に利用可能です。

完全な設定リストは、プロキシの構成、再試行、高度なSSLの設定方法を含む完全なリストはこちらで見ることができます。

GitHubサンプルデータ用の設定ファイルの例はこちらにあります。Connectがスタンドアロンモードで実行され、KafkaがConfluent Cloudでホストされていると仮定しています。

2. ClickHouseテーブルを作成する

テーブルが作成されていることを確認します。標準的なMergeTreeを使用した最小限のgithubデータセットの例を以下に示します。

3. Kafkaにデータを追加する

メッセージをKafkaに挿入します。以下では、kcatを使用して10kメッセージを挿入します。

ターゲットテーブル「Github」を単純に読み込むことで、データの挿入を確認できます。