メインコンテンツまでスキップ
メインコンテンツまでスキップ

JDBCコネクタ

注記

このコネクタは、データがシンプルであり、整数などの基本データ型で構成されている場合のみ使用するべきです。 ClickHouse特有のタイプ(マップなど)はサポートされていません。

例として、私たちはKafka ConnectのConfluentディストリビューションを利用します。

以下に、単一のKafkaトピックからメッセージを取得し、ClickHouseテーブルに行を挿入する簡単なインストール手順を説明します。 Kafka環境を持っていない方には、寛大な無料プランを提供するConfluent Cloudをお勧めします。

JDBCコネクタにはスキーマが必要であることに注意してください(JDBCコネクタにはプレーンなJSONまたはCSVを使用できません)。スキーマを各メッセージにエンコードすることも可能ですが、関連するオーバーヘッドを回避するために、Confluentスキーマレジストリを使用することを強く推奨します。提供された挿入スクリプトは、メッセージから自動的にスキーマを推測し、これをレジストリに挿入します - このスクリプトは他のデータセットにも再利用可能です。Kafkaのキーは文字列であると仮定されます。Kafkaスキーマに関する詳細はこちらにあります。

ライセンス

JDBCコネクタはConfluent Community Licenseの下で配布されています。

手順

接続情報の収集

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS, and the details are available in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


以下は、ClickHouseにHTTP(S)で接続するために必要な情報です:

  • HOSTとPORT: 通常、TLSを使用する場合はポートが8443、使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用します。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。使用ケースに適したユーザー名を使用します。

ClickHouse Cloudサービスの詳細は、ClickHouse Cloudコンソールで確認できます。 接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細はexample curlコマンドで確認できます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合は、接続の詳細がClickHouse管理者によって設定されます。

1. Kafka Connectとコネクタのインストール

Confluentパッケージをダウンロードし、ローカルにインストールしたと仮定します。コネクタのインストール手順については、こちらに記載されています。

confluent-hubインストールメソッドを使用する場合、ローカルの設定ファイルは更新されます。

KafkaからClickHouseにデータを送信するには、コネクタのシンクコンポーネントを使用します。

2. JDBCドライバのダウンロードとインストール

こちらからclickhouse-jdbc-<version>-shaded.jarというClickHouse JDBCドライバをダウンロードし、インストールします。Kafka Connectへのインストール手順はこちらに従ってください。他のドライバも機能する場合がありますが、テストされていません。

注記

一般的な問題:ドキュメントではjarをshare/java/kafka-connect-jdbc/にコピーするよう示されています。Connectがドライバを見つけられない場合は、ドライバをshare/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/にコピーしてください。または、ドライバを含めるようにplugin.pathを修正してください - 詳細は以下の通りです。

3. 設定の準備

これらの手順に従って、インストールタイプに関連するConnectのセットアップを行います。スタンドアロンと分散クラスタの違いに注意してください。Confluent Cloudを使用する場合は、分散セットアップが関連します。

以下のパラメータは、ClickHouseでJDBCコネクタを使用するために関連します。完全なパラメータリストはこちらにあります:

  • _connection.url_ - 形式はjdbc:clickhouse://&lt;clickhouse host&gt;:&lt;clickhouse http port&gt;/&lt;target database&gt;にする必要があります
  • connection.user - 対象データベースへの書き込みアクセスを持つユーザー
  • table.name.format - データを挿入するClickHouseテーブル。これが存在する必要があります。
  • batch.size - 一度に送信する行数。適切に大きな数に設定してください。ClickHouseの推奨事項によると、1000の値は最低限考慮すべきです。
  • tasks.max - JDBCシンクコネクタは1つ以上のタスクを実行することをサポートしています。これを使用してパフォーマンスを向上させます。バッチサイズと合わせて、パフォーマンス向上の主要な手段を表します。
  • value.converter.schemas.enable - スキーマレジストリを使用している場合はfalse、メッセージにスキーマを埋め込んでいる場合はtrueに設定します。
  • value.converter - データ型に応じて設定します。例えばJSONの場合、io.confluent.connect.json.JsonSchemaConverterです。
  • key.converter - org.apache.kafka.connect.storage.StringConverterに設定します。Stringキーを使用しています。
  • pk.mode - ClickHouseには関係ありません。noneに設定します。
  • auto.create - サポートされておらず、falseでなければなりません。
  • auto.evolve - この設定についてはfalseを推奨しますが、将来的にはサポートされるかもしれません。
  • insert.mode - "insert"に設定します。他のモードは現在サポートされていません。
  • key.converter - キーのタイプに応じて設定します。
  • value.converter - トピック内のデータのタイプに基づいて設定します。このデータにはサポートされているスキーマが必要です - JSON、AvroまたはProtobuf形式。

テスト用のサンプルデータセットを使用している場合は、以下が設定されていることを確認してください:

  • value.converter.schemas.enable - スキーマレジストリを使用しているためfalseに設定します。各メッセージにスキーマを埋め込んでいる場合はtrueに設定します。
  • key.converter - "org.apache.kafka.connect.storage.StringConverter"に設定します。Stringキーを使用しています。
  • value.converter - "io.confluent.connect.json.JsonSchemaConverter"に設定します。
  • value.converter.schema.registry.url - スキーマサーバーのURLと、スキーマサーバーの認証情報をvalue.converter.schema.registry.basic.auth.user.infoパラメータを通じて設定します。

Githubサンプルデータ用の設定ファイル例は、こちらで見つけられます。Connectがスタンドアロンモードで実行され、KafkaがConfluent Cloudでホストされていると仮定します。

4. ClickHouseテーブルを作成する

テーブルが作成されていることを確認し、以前の例から既に存在する場合は削除します。簡易化されたGithubデータセットに互換性のある例を以下に示します。現在サポートされていないArrayまたはMapタイプがないことに注意してください:

CREATE TABLE github
(
    file_time DateTime,
    event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
    actor_login LowCardinality(String),
    repo_name LowCardinality(String),
    created_at DateTime,
    updated_at DateTime,
    action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
    comment_id UInt64,
    path String,
    ref LowCardinality(String),
    ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
    creator_user_login LowCardinality(String),
    number UInt32,
    title String,
    state Enum('none' = 0, 'open' = 1, 'closed' = 2),
    assignee LowCardinality(String),
    closed_at DateTime,
    merged_at DateTime,
    merge_commit_sha String,
    merged_by LowCardinality(String),
    review_comments UInt32,
    member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)

5. Kafka Connectを起動する

スタンドアロンまたは分散モードでKafka Connectを起動します。

./bin/connect-standalone connect.properties.ini github-jdbc-sink.properties.ini

6. Kafkaにデータを追加する

提供されたスクリプトと設定を使用してKafkaにメッセージを挿入します。github.configを変更してKafkaの認証情報を追加する必要があります。スクリプトは現在Confluent Cloudでの使用に設定されています。

python producer.py -c github.config

このスクリプトは、任意のndjsonファイルをKafkaトピックに挿入するために使用できます。このスクリプトは自動的にスキーマを推測しようとします。提供されたサンプル設定は10,000メッセージのみを挿入します - 必要に応じてここを変更してください。この設定は、Kafkaへの挿入中に不適合なArrayフィールドをデータセットから除去します。

これはJDBCコネクタがメッセージをINSERT文に変換するために必要です。自分のデータを使用している場合は、スキーマを各メッセージに挿入するか(_value.converter.schemas.enable_をtrueに設定)、クライアントがスキーマをレジストリに参照するメッセージを公開することを確実にしてください。

Kafka Connectはメッセージを消費し、ClickHouseに行を挿入し始めるはずです。" [JDBC Compliant Mode] トランザクションはサポートされていません。"という警告は予想され、無視可能です。

対象テーブル「Github」で簡単に読んでデータ挿入を確認してください。

SELECT count() FROM default.github;
| count\(\) |
| :--- |
| 10000 |