メインコンテンツまでスキップ
メインコンテンツまでスキップ

Confluent HTTP Sink Connector

HTTP Sink Connectorはデータ型に依存しないため、Kafkaスキーマを必要とせず、MapsやArraysなどのClickHouse特有のデータ型をサポートします。この追加の柔軟性は、設定の複雑さが少し増すことを意味します。

以下では、単一のKafkaトピックからメッセージを取得し、行をClickHouseテーブルに挿入するシンプルなインストール手順を説明します。

注記

HTTP ConnectorはConfluent Enterprise Licenseのもとで配布されています。

クイックスタート手順

1. 接続情報を取得する

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS, and the details are available in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


以下は、ClickHouseにHTTP(S)で接続するために必要な情報です:

  • HOSTとPORT: 通常、TLSを使用する場合はポートが8443、使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用します。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。使用ケースに適したユーザー名を使用します。

ClickHouse Cloudサービスの詳細は、ClickHouse Cloudコンソールで確認できます。 接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細はexample curlコマンドで確認できます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合は、接続の詳細がClickHouse管理者によって設定されます。

2. Kafka ConnectとHTTP Sink Connectorを実行する

2つのオプションがあります:

  • セルフマネージド: Confluentパッケージをダウンロードし、ローカルにインストールします。コネクタのインストールに関する手順はこちらに記載されています。 confluent-hubインストール方法を使用すると、ローカル設定ファイルが更新されます。

  • Confluent Cloud: KafkaホスティングにConfluent Cloudを利用している場合、HTTP Sinkの完全管理型バージョンが利用可能です。これにはClickHouse環境がConfluent Cloudからアクセス可能である必要があります。

注記

以下の例はConfluent Cloudを使用しています。

3. ClickHouseに宛先テーブルを作成する

接続テストの前に、ClickHouse Cloudでテストテーブルを作成しましょう。このテーブルはKafkaからのデータを受信します:

CREATE TABLE default.my_table
(
    `side` String,
    `quantity` Int32,
    `symbol` String,
    `price` Int32,
    `account` String,
    `userid` String
)
ORDER BY tuple()

4. HTTP Sinkを設定する

KafkaトピックとHTTP Sink Connectorのインスタンスを作成します:

Confluent Cloud interface showing how to create an HTTP Sink connector

HTTP Sink Connectorを設定します:

  • 作成したトピック名を指定します
  • 認証
    • HTTP Url - ClickHouse CloudのURLで、指定されたINSERTクエリ <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRowを含みます。注意: クエリはエンコードされる必要があります。
    • Endpoint Authentication type - BASIC
    • Auth username - ClickHouseのユーザー名
    • Auth password - ClickHouseのパスワード
注記

このHTTP Urlはエラーが発生しやすいです。問題を避けるためにエスケープが正確であることを確認してください。

Confluent Cloud interface showing authentication settings for the HTTP Sink connector

  • 設定
    • Input Kafka record value format - ソースデータに依存しますが、ほとんどの場合はJSONまたはAvroです。以下の設定ではJSONを想定しています。
    • advanced configurationsセクションで:
      • HTTP Request Method - POSTに設定します
      • Request Body Format - json
      • Batch batch size - ClickHouseの推奨に従い、少なくとも1000に設定します。
      • Batch json as array - true
      • Retry on HTTP codes - 400-500ですが、必要に応じて適応してください(例: ClickHouseの前にHTTPプロキシがある場合があるため)。
      • Maximum Reties - デフォルト(10)は適切ですが、より堅牢なリトライのために調整してください。
Confluent Cloud interface showing advanced configuration options for the HTTP Sink connector

5. 接続性をテストする

HTTP Sinkで構成されたトピックにメッセージを作成します

Confluent Cloud interface showing how to create a test message in a Kafka topic

作成したメッセージがClickHouseインスタンスに書き込まれたことを確認してください。

トラブルシューティング

HTTP Sinkがメッセージをバッチしない

Sink documentationから:

HTTP Sinkコネクタは、異なるKafkaヘッダ値を含むメッセージのリクエストをバッチ処理しません。

  1. Kafkaレコードが同じキーを持っていることを確認してください。
  2. HTTP API URLにパラメータを追加すると、各レコードがユニークなURLを生成します。このため、追加のURLパラメータを使用している場合、バッチ処理が無効になります。

400 bad request

CANNOT_PARSE_QUOTED_STRING

HTTP SinkがStringカラムにJSONオブジェクトを挿入中に次のメッセージで失敗した場合:

Code: 26. DB::ParsingException: Cannot parse JSON string: expected opening quote: (while reading the value of key key_name): While executing JSONEachRowRowInputFormat: (at row 1). (CANNOT_PARSE_QUOTED_STRING)

URLに設定するinput_format_json_read_objects_as_strings=1パラメータをエンコードされた文字列SETTINGS%20input_format_json_read_objects_as_strings%3D1として設定します。

GitHubデータセットをロードする (オプション)

この例では、GithubデータセットのArrayフィールドを保持しています。例では空のgithubトピックがあると仮定し、メッセージ挿入にはkcatを使用します。

1. 設定を準備する

お使いのインストールタイプに関連するConnectのセットアップについてはこれらの手順に従ってください。スタンドアロンと分散クラスタの違いに注意してください。Confluent Cloudを使用している場合、分散セットアップが関連します。

最も重要なパラメータはhttp.api.urlです。ClickHouseのHTTPインターフェースは、INSERT文をURLのパラメータとしてエンコードする必要があります。これには形式(この場合はJSONEachRow)と対象データベースが含まれます。形式はKafkaデータと一致していなければならず、HTTPペイロード内で文字列に変換されます。これらのパラメータはURLエスケープされなければなりません。Githubデータセット用のこの形式の例(ClickHouseをローカルで実行していると仮定)は以下に示されています:

<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow

http://localhost:8123?query=INSERT%20INTO%20default.github%20FORMAT%20JSONEachRow

HTTP SinkをClickHouseで使用するために関連する他の追加パラメータは次のとおりです。完全なパラメータリストはこちらで確認できます:

  • request.method - POSTに設定
  • retry.on.status.codes - エラーコード400-500でリトライするように設定。データの予想されるエラーに基づいて詳細設定してください。
  • request.body.format - ほとんどの場合、これはJSONになります。
  • auth.type - ClickHouseにセキュリティがある場合、BASICに設定します。他のClickHouse対応の認証メカニズムは現在サポートされていません。
  • ssl.enabled - SSLを使用する場合はtrueに設定。
  • connection.user - ClickHouseのユーザー名。
  • connection.password - ClickHouseのパスワード。
  • batch.max.size - 単一バッチで送信する行数。適切に大きな数値に設定してください。ClickHouseの推奨に従い、1000は最小値と見なされるべきです。
  • tasks.max - HTTP Sinkコネクタは1つ以上のタスクを実行できます。これを利用してパフォーマンスを向上させることができます。バッチサイズと共に、パフォーマンスを向上させる主な手段となります。
  • key.converter - キーのタイプに応じて設定。
  • value.converter - トピックのデータのタイプに基づいて設定。このデータはスキーマを必要としません。ここでの形式は、http.api.urlパラメータで指定されたFORMATと一致している必要があります。ここで最も簡単なのはJSONを使用し、org.apache.kafka.connect.json.JsonConverterコンバータを利用することです。値を文字列として扱うことも可能で、その場合はorg.apache.kafka.connect.storage.StringConverterコンバータを使用しますが、その場合は挿入文で関数を使用して値を抽出する必要があります。Avro形式もClickHouseでサポートされています、io.confluent.connect.avro.AvroConverterコンバータを使用する場合。

プロキシ、リトライ、および高度なSSLの設定方法を含む完全な設定リストはこちらで確認できます。

Githubサンプルデータ用の設定ファイルの例はこちらにあります。Connectがスタンドアロンモードで実行され、KafkaがConfluent Cloudにホストされていると仮定しています。

2. ClickHouseテーブルを作成する

テーブルが作成されていることを確認します。標準のMergeTreeを使用した最小のgithubデータセットの例は以下に示されています。

CREATE TABLE github
(
    file_time DateTime,
    event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4,'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
    actor_login LowCardinality(String),
    repo_name LowCardinality(String),
    created_at DateTime,
    updated_at DateTime,
    action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
    comment_id UInt64,
    path String,
    ref LowCardinality(String),
    ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
    creator_user_login LowCardinality(String),
    number UInt32,
    title String,
    labels Array(LowCardinality(String)),
    state Enum('none' = 0, 'open' = 1, 'closed' = 2),
    assignee LowCardinality(String),
    assignees Array(LowCardinality(String)),
    closed_at DateTime,
    merged_at DateTime,
    merge_commit_sha String,
    requested_reviewers Array(LowCardinality(String)),
    merged_by LowCardinality(String),
    review_comments UInt32,
    member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)

3. Kafkaにデータを追加する

Kafkaにメッセージを挿入します。以下ではkcatを使用して10,000メッセージを挿入します。

head -n 10000 github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username>  -X sasl.password=<password> -t github

ターゲットテーブル"Github"を単純に読み取ることで、データの挿入が確認できます。

SELECT count() FROM default.github;

| count\(\) |
| :--- |
| 10000 |