メインコンテンツまでスキップ
メインコンテンツまでスキップ

JDBCコネクタ

注記

このコネクタは、データがシンプルで、intなどのプリミティブデータ型で構成されている場合のみ使用するべきです。mapsなどのClickHouse特有の型はサポートされていません。

私たちの例では、Confluent配布のKafka Connectを利用します。

以下では、単一のKafkaトピックからメッセージを取得し、ClickHouseテーブルに行を挿入するシンプルなインストール方法について説明します。Kafka環境がない方には、寛大な無料プランを提供するConfluent Cloudをお勧めします。

JDBCコネクタにはスキーマが必要であることに注意してください(JDBCコネクタでプレーンJSONやCSVを使用することはできません)。スキーマは各メッセージにエンコードできますが、関連するオーバーヘッドを避けるために、Confluentスキーマレジストリyの使用が強く推奨されます。提供された挿入スクリプトは、メッセージからスキーマを自動的に推測し、これをレジストリに挿入します。このスクリプトは他のデータセットでも再利用できます。Kafkaのキーは文字列であると仮定されます。Kafkaスキーマの詳細については、こちらを参照してください。

ライセンス

JDBCコネクタは、Confluent Community Licenseの下で配布されています。

ステップ

接続情報を収集する

ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOST と PORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAME と PASSWORD: デフォルトでは、ユーザー名はdefaultです。あなたのユースケースに適したユーザー名を使用してください。

あなたのClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細は例のcurlコマンドに表示されます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合、接続の詳細はあなたのClickHouse管理者によって設定されています。

1. Kafka Connectおよびコネクタのインストール

Confluentパッケージをダウンロードし、ローカルにインストールしたと仮定します。コネクタをインストールするためのインストラクションはこちらに記載されています。

confluent-hubインストール方法を使用する場合、ローカル設定ファイルが更新されます。

KafkaからClickHouseにデータを送信するために、コネクタのSinkコンポーネントを使用します。

2. JDBCドライバをダウンロードしてインストールする

こちらからClickHouse JDBCドライバclickhouse-jdbc-<version>-shaded.jarをダウンロードしインストールします。このドライバをKafka Connectにインストールする方法についてはこちらを参照してください。他のドライバも動作する可能性がありますが、保証はありません。

注記

一般的な問題:ドキュメントでは、jarをshare/java/kafka-connect-jdbc/にコピーすることを提案しています。Connectがドライバを見つけられない場合は、ドライバをshare/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/にコピーしてください。または、plugin.pathを変更してドライバを含めることができます - 下記を参照してください。

3. 設定を準備する

インストールタイプに関連するConnectのセットアップに関するこちらの指示に従ってください。スタンドアロンと分散クラスタの違いに注意してください。Confluent Cloudを使用する場合、分散セットアップが関連します。

ClickHouseとJDBCコネクタを使用する際に必要なパラメータは以下の通りです。完全なパラメータリストはこちらで確認できます:

  • _connection.url_ - jdbc:clickhouse://&lt;clickhouse host>:&lt;clickhouse http port>/&lt;target database>の形式である必要があります。
  • connection.user - 対象データベースへの書き込みアクセスを持つユーザー
  • table.name.format - データを挿入するClickHouseテーブル。このテーブルは存在する必要があります。
  • batch.size - 一度に送信する行の数。適切に大きな数に設定してください。ClickHouseの推奨事項に従い、1000の値を最低限の値として考慮してください。
  • tasks.max - JDBC Sinkコネクタは1つ以上のタスクの実行をサポートします。これはパフォーマンスを向上させるために使用できます。バッチサイズと共に、パフォーマンスを向上させる主要な手段を表します。
  • value.converter.schemas.enable - スキーマレジストリを使用する場合はfalse、メッセージにスキーマを埋め込む場合はtrueに設定します。
  • value.converter - データ型に応じて設定します。例えばJSONの場合は、io.confluent.connect.json.JsonSchemaConverter
  • key.converter - org.apache.kafka.connect.storage.StringConverterに設定します。文字列キーを使用します。
  • pk.mode - ClickHouseには関係ありません。noneに設定します。
  • auto.create - サポートされておらず、falseにする必要があります。
  • auto.evolve - この設定は将来的にサポートされるかもしれませんが、現在はfalseを推奨します。
  • insert.mode - "insert"に設定します。他のモードは現在サポートされていません。
  • key.converter - キーのタイプに応じて設定します。
  • value.converter - トピックのデータタイプに基づいて設定します。このデータにはサポートされているスキーマ - JSON、AvroまたはProtobuf形式 - が必要です。

テスト用にサンプルデータセットを使用する場合、以下の設定を確認してください:

  • value.converter.schemas.enable - スキーマレジストリを利用しているためfalseに設定します。各メッセージにスキーマを埋め込む場合はtrueに設定します。
  • key.converter - "org.apache.kafka.connect.storage.StringConverter"に設定します。文字列キーを使用します。
  • value.converter - "io.confluent.connect.json.JsonSchemaConverter"に設定します。
  • value.converter.schema.registry.url - スキーマサーバのURLと、value.converter.schema.registry.basic.auth.user.infoを通じてスキーマサーバの資格情報を設定します。

Githubサンプルデータの設定ファイルの例はこちらで見つけることができます。Connectはスタンドアロンモードで実行され、KafkaはConfluent Cloudにホストされます。

4. ClickHouseテーブルを作成する

テーブルが作成されていることを確認し、前の例から既に存在する場合は削除します。制限されたGithubデータセットと互換性のある例は以下に示されています。サポートされていないArrayやMap型がないことに注意してください:

5. Kafka Connectを起動する

スタンドアロンまたは分散モードでKafka Connectを起動します。

6. Kafkaにデータを追加する

提供されたスクリプトと設定を使用して、Kafkaにメッセージを挿入します。github.configを変更してKafka資格情報を含める必要があります。このスクリプトは現在Confluent Cloudでの使用のために設定されています。

このスクリプトは任意のndjsonファイルをKafkaトピックに挿入するために使用できます。これにより、自動的にスキーマを推測しようとします。サンプルで提供された設定は、10,000メッセージのみを挿入します - 必要に応じてここを変更してください。この設定は、挿入中に互換性のないArrayフィールドをデータセットから削除します。

これはJDBCコネクタがメッセージをINSERT文に変換するために必要です。独自のデータを使用している場合は、各メッセージにスキーマを挿入する(_value.converter.schemas.enable_をtrueに設定する)か、クライアントがレジストリにスキーマを参照するメッセージを送信することを確認してください。

Kafka Connectは、メッセージを消費し、ClickHouseに行を挿入し始めるはずです。「[JDBC準拠モード] トランザクションはサポートされていません。」に関する警告は予期されるもので、無視して構いません。

ターゲットテーブル「Github」を簡単に読み取ることで、データの挿入を確認できます。