メインコンテンツまでスキップ
メインコンテンツまでスキップ

Confluent HTTP Sink Connector

HTTP Sink Connectorはデータ型に依存せず、Kafkaスキーマを必要としないため、MapsやArraysなどのClickHouse特有のデータ型をサポートしています。この追加の柔軟性は、構成の複雑さがわずかに増すという点でのトレードオフがあります。

以下に、単一のKafkaトピックからメッセージを取得し、ClickHouseテーブルに行を挿入するシンプルなインストール手順を説明します。

注記

HTTP ConnectorはConfluent Enterprise Licenseの下で配布されています。

クイックスタート手順

1. 接続詳細を収集する

ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOST と PORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAME と PASSWORD: デフォルトでは、ユーザー名はdefaultです。あなたのユースケースに適したユーザー名を使用してください。

あなたのClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細は例のcurlコマンドに表示されます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合、接続の詳細はあなたのClickHouse管理者によって設定されています。

2. Kafka ConnectとHTTP Sink Connectorを実行する

以下の2つのオプションがあります:

  • セルフマネージド: Confluentパッケージをダウンロードし、ローカルにインストールします。コネクタのインストールに関する手順は、こちらに記載されています。 confluent-hubインストールメソッドを使用する場合、ローカルの設定ファイルが更新されます。

  • Confluent Cloud: Confluent CloudでKafkaホスティングを使用している場合、HTTP Sinkの完全に管理されたバージョンが利用可能です。この場合、あなたのClickHouse環境がConfluent Cloudからアクセスできる必要があります。

注記

以下の例はConfluent Cloudを使用しています。

3. ClickHouseに宛先テーブルを作成する

接続テストの前に、ClickHouse Cloudにテスト用テーブルを作成しましょう。このテーブルはKafkaからデータを受け取ります:

4. HTTP Sinkを構成する

KafkaトピックとHTTP Sink Connectorのインスタンスを作成します:

Create HTTP Sink

HTTP Sink Connectorを構成します:

  • あなたが作成したトピック名を提供します
  • 認証
    • HTTP Url - INSERTクエリを指定したClickHouse Cloud URL <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow注意: クエリはエンコードされている必要があります。
    • Endpoint Authentication type - BASIC
    • Auth username - ClickHouseユーザー名
    • Auth password - ClickHouseパスワード
注記

このHTTP Urlはエラーが発生しやすいです。問題を避けるためにエスケープが正確であることを確認してください。

Auth options for Confluent HTTP Sink
  • 構成
    • Input Kafka record value format - ソースデータに応じますが、ほとんどの場合JSONまたはAvroです。以下の設定ではJSONを想定しています。
    • advanced configurationsセクションで:
      • HTTP Request Method - POSTに設定
      • Request Body Format - json
      • Batch batch size - ClickHouseの推奨に従い、少なくとも1000に設定します。
      • Batch json as array - true
      • Retry on HTTP codes - 400-500に設定しますが、必要に応じて調整してください。例えば、ClickHouseの前にHTTPプロキシがある場合は変わるかもしれません。
      • Maximum Reties - デフォルト(10)は適切ですが、より堅牢なリトライのために調整してください。
Advanced options for Confluent HTTP Sink

5. 接続のテスト

HTTP Sinkで構成されたトピックでメッセージを作成します

Create a message in the topic

作成したメッセージがあなたのClickHouseインスタンスに書き込まれたことを確認します。

トラブルシューティング

HTTP Sinkがメッセージをバッチ処理しない

Sinkドキュメントからの引用:

HTTP Sinkコネクタは、異なるKafkaヘッダー値を含むメッセージのリクエストをバッチ処理しません。

  1. Kafkaレコードに同じキーがあることを確認します。
  2. HTTP API URLにパラメータを追加すると、各レコードがユニークなURLを生成する可能性があります。このため、追加のURLパラメータを使用する際にはバッチ処理が無効になります。

400 Bad Request

CANNOT_PARSE_QUOTED_STRING

HTTP SinkがStringカラムにJSONオブジェクトを挿入する際に次のメッセージで失敗した場合:

URLにエンコードされた文字列SETTINGS%20input_format_json_read_objects_as_strings%3D1としてinput_format_json_read_objects_as_strings=1設定を追加します。

GitHubデータセットをロードする (オプション)

この例はGithubデータセットのArrayフィールドを保持しています。例では空のgithubトピックがあると仮定し、Kafkaへのメッセージ挿入にはkcatを使用します。

1. 構成を準備する

これらの手順に従って、設置タイプに関連するConnectを設定します。スタンドアロンと分散クラスタの違いに注意してください。Confluent Cloudを使用する場合、分散設定が関連します。

最も重要なパラメータはhttp.api.urlです。ClickHouseのHTTPインターフェースでは、URLのパラメータとしてINSERT文をエンコードする必要があります。これにはフォーマット(この場合はJSONEachRow)とターゲットデータベースを含める必要があります。フォーマットはKafkaデータと一貫している必要があり、HTTPペイロードで文字列に変換されます。これらのパラメータはURLエスケープする必要があります。Githubデータセットのためのこのフォーマットの例(ClickHouseをローカルで実行していると仮定)を以下に示します:

HTTP SinkをClickHouseで使用する際に関連する以下の追加パラメータがあります。完全なパラメータリストはこちらにあります:

  • request.method - POSTに設定
  • retry.on.status.codes - エラーコードに対して再試行するために400-500に設定します。データの期待されるエラーに基づいて洗練されます。
  • request.body.format - ほとんどの場合これはJSONになります。
  • auth.type - ClickHouseのセキュリティを考慮してBASICに設定します。他のClickHouse対応の認証メカニズムは現在サポートされていません。
  • ssl.enabled - SSLを使用している場合はtrueに設定。
  • connection.user - ClickHouseのユーザー名。
  • connection.password - ClickHouseのパスワード。
  • batch.max.size - 単一バッチで送信する行数。適切に大きな数に設定することを確認してください。ClickHouseの推奨に従い、1000を最低値として考慮すべきです。
  • tasks.max - HTTP Sinkコネクタは一つ以上のタスクを実行することをサポートしています。これはパフォーマンスを向上させるために使用できます。バッチサイズとともに、パフォーマンスを向上させる主な手段を表します。
  • key.converter - キーのタイプに応じて設定。
  • value.converter - トピックのデータタイプに基づいて設定。このデータはスキーマを必要としません。ここでのフォーマットはhttp.api.urlパラメータに指定されたFORMATと整合性がなければなりません。最も簡単なのはJSONを使用し、org.apache.kafka.connect.json.JsonConverterコンバータを使用することです。値を文字列として扱うために、org.apache.kafka.connect.storage.StringConverterを介しても可能ですが、これにはユーザーが関数を使用して挿入文で値を抽出する必要があります。AvroフォーマットもClickHouseでサポートされています。

プロキシの設定、リトライ、SSLの高度な設定を含む設定リストの完全なリストはこちらにあります。

Githubサンプルデータに対する設定ファイルの例は、こちらで見つけることができます。Connectがスタンドアロンモードで実行され、KafkaがConfluent Cloudにホストされていると仮定します。

2. ClickHouseテーブルを作成する

テーブルが作成されていることを確認します。標準のMergeTreeを使用した最小限のgithubデータセットの例を以下に示します。

3. Kafkaにデータを追加する

Kafkaにメッセージを挿入します。以下ではkcatを使用して10,000件のメッセージを挿入します。

ターゲットテーブル「Github」にデータが挿入されたことを確認するために簡単な読み取りを行います。