メインコンテンツまでスキップ
メインコンテンツまでスキップ

JDBC Connector

注記

このコネクタは、データが単純で、intなどの基本データ型から成る場合のみ使用するべきです。マップなどのClickHouse独自の型はサポートされていません。

私たちの例では、Kafka ConnectのConfluentディストリビューションを利用しています。

以下に、単一のKafkaトピックからメッセージを取得し、ClickHouseテーブルに行を挿入する簡単なインストール手順を説明します。Kafka環境を持っていない場合は、寛大な無料プランを提供するConfluent Cloudをお勧めします。

JDBCコネクタにはスキーマが必要であることに注意してください(JDBCコネクタでは通常のJSONやCSVを使用できません)。スキーマは各メッセージにエンコードできますが、関連するオーバーヘッドを避けるためにConfluentスキーマレジストリを使用することを強く推奨します。提供される挿入スクリプトは、メッセージから自動的にスキーマを推測し、これをレジストリに挿入します。このスクリプトは他のデータセットでも再利用できます。Kafkaのキーは文字列であると仮定されます。Kafkaスキーマに関する詳細はこちらで確認できます。

License

JDBCコネクタはConfluent Community Licenseの下で配布されています。

Steps

Gather your connection details

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。ご利用のケースに適したユーザー名を使用してください。

ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurlコマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。

1. Install Kafka Connect and Connector

Confluentパッケージをダウンロードしてローカルにインストールしたことを前提とします。コネクタをインストールするための手順はこちらに記載されています。

confluent-hubインストール方法を使用する場合、ローカルの設定ファイルが更新されます。

KafkaからClickHouseにデータを送信するために、コネクタのSinkコンポーネントを使用します。

2. Download and install the JDBC Driver

ClickHouse JDBCドライバclickhouse-jdbc-<version>-shaded.jarこちらからダウンロードしてインストールします。Kafka Connectにインストールする詳細はこちらを参照してください。他のドライバも動作する可能性がありますが、テストは行われていません。

注記

一般的な問題: ドキュメントではjarをshare/java/kafka-connect-jdbc/にコピーすることを推奨しています。Connectがドライバを見つけられない場合は、ドライバをshare/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/にコピーしてください。または、ドライバを含むようにplugin.pathを変更します - 以下を参照してください。

3. Prepare Configuration

あなたのインストールタイプに関連するConnectのセットアップはこちらの指示に従って行ってください。スタンドアロンと分散クラスターの違いに留意してください。Confluent Cloudを使用する場合、分散セットアップが関連します。

ClickHouseでJDBCコネクタを使用するために関連するパラメータは以下の通りです。全パラメータリストはこちらで確認できます。

  • _connection.url_ - これは jdbc:clickhouse://&lt;clickhouse host>:&lt;clickhouse http port>/&lt;target database>の形式である必要があります。
  • connection.user - 対象データベースへの書き込みアクセスを持つユーザー。
  • table.name.format - データを挿入するClickHouseテーブル。このテーブルは存在する必要があります。
  • batch.size - 一度に送信する行の数。この値は適切に大きい数に設定してください。ClickHouseの推奨事項では、1000の値を最低限として考慮する必要があります。
  • tasks.max - JDBC Sinkコネクタは一つまたは複数のタスクを実行することをサポートします。これはパフォーマンスを向上させるために使用できます。バッチサイズと共に、パフォーマンスを改善するための主要な手段を表します。
  • value.converter.schemas.enable - スキーマレジストリを使用している場合はfalseに設定し、メッセージにスキーマを埋め込む場合はtrueに設定します。
  • value.converter - データ型に応じて設定します。例えば、JSONの場合はio.confluent.connect.json.JsonSchemaConverter
  • key.converter - org.apache.kafka.connect.storage.StringConverterに設定します。文字列キーを使用します。
  • pk.mode - ClickHouseには関連しません。noneに設定します。
  • auto.create - サポートされておらず、falseにする必要があります。
  • auto.evolve - この設定はfalseを推奨しますが、将来的にはサポートされる可能性があります。
  • insert.mode - "insert"に設定します。他のモードは現在サポートされていません。
  • key.converter - キーの種類に応じて設定します。
  • value.converter - トピック上のデータに基づいて設定します。このデータにはサポートされているスキーマが必要です - JSON、Avro、またはProtobufフォーマット。

テストのためにサンプルデータセットを使用する場合は、以下を設定してください:

  • value.converter.schemas.enable - スキーマレジストリを使用しているためfalseに設定します。各メッセージにスキーマを埋め込む場合はtrueに設定します。
  • key.converter - "org.apache.kafka.connect.storage.StringConverter"に設定します。文字列キーを使用します。
  • value.converter - "io.confluent.connect.json.JsonSchemaConverter"に設定します。
  • value.converter.schema.registry.url - スキーマサーバーのURLとともに、スキーマサーバーの認証情報をvalue.converter.schema.registry.basic.auth.user.infoパラメータを介して設定します。

Githubのサンプルデータに関する設定ファイルの例はこちらにあります。これをスタンドアロンモードで実行し、KafkaがConfluent Cloudにホストされていると仮定します。

4. Create the ClickHouse table

テーブルが作成されていることを確認し、以前の例から既に存在する場合は削除します。縮小されたGithubデータセットに対応した例を以下に示します。現在サポートされていないArrayやMap型がないことに注意してください:

5. Start Kafka Connect

スタンドアロンまたは分散モードでKafka Connectを起動します。

6. Add data to Kafka

提供されたスクリプトと設定を使用してKafkaにメッセージを挿入します。github.configを変更してKafkaの認証情報を含める必要があります。このスクリプトは現在Confluent Cloudでの使用に設定されています。

このスクリプトは任意のndjsonファイルをKafkaトピックに挿入するために使用できます。これにより、自動的にスキーマを推測しようとします。提供されたサンプル設定は10,000メッセージのみを挿入するように設定されています - 必要に応じてここで変更してください。この設定は、Kafkaへの挿入中にデータセットから互換性のないArrayフィールドを削除します。

これは、JDBCコネクタがメッセージをINSERT文に変換するために必要です。自分のデータを使用している場合は、スキーマを各メッセージに挿入する(_value.converter.schemas.enable_をtrueに設定する)か、クライアントがレジストリにスキーマを参照してメッセージを公開するようにしてください。

Kafka Connectはメッセージの消費を開始し、ClickHouseに行を挿入するはずです。「[JDBC Compliance Mode] トランザクションはサポートされていません。」という警告が表示されることがありますが、これは予期されるものであり、無視することができます。

ターゲットテーブル「Github」を簡単に読み取ることで、データの挿入を確認できます。