メインコンテンツまでスキップ
メインコンテンツまでスキップ

KafkaとClickHouse Cloudの統合

前提条件

ClickPipesの紹介に目を通してください。

最初のKafka ClickPipeを作成する

  1. ClickHouse CloudサービスのSQLコンソールにアクセスします。
ClickPipes service
  1. 左側のメニューからData Sourcesボタンを選択し、「ClickPipeを設定」をクリックします。
Select imports
  1. データソースを選択します。
Select data source type
  1. ClickPipeに名前、説明(オプション)、認証情報、その他の接続詳細を提供してフォームに入力します。
Fill out connection details
  1. スキーマレジストリを構成します。Avroストリームには有効なスキーマが必要で、JSONにはオプションとなります。このスキーマは、選択したトピックのAvroConfluentの解析またはJSONメッセージの検証に使用されます。
  • 解析できないAvroメッセージや、検証に失敗したJSONメッセージはエラーを生成します。
  • スキーマレジストリの「ルート」パス。たとえば、Confluent CloudスキーマレジストリのURLは、パスがないHTTPS URLのようになります。https://test-kk999.us-east-2.aws.confluent.cloud ルートパスのみが指定された場合、ステップ4でのカラム名と型を決定するために使用されるスキーマは、サンプルKafkaメッセージに埋め込まれたIDによって決まります。
  • 数値スキーマIDによるスキーマ文書へのパス/schemas/ids/[ID]。スキーマIDを使用した完全なURLはhttps://registry.example.com/schemas/ids/1000
  • サブジェクト名によるスキーマ文書へのパス/subjects/[subject_name]。オプションで、特定のバージョンは、URLに/versions/[version]を追加することで参照できます(そうでない場合、ClickPipesは最新バージョンを取得します)。サブジェクトスキーマを使用した完全なURLはhttps://registry.example.com/subjects/eventsまたはhttps://registry/example.com/subjects/events/versions/4

すべての場合において、ClickPipesはメッセージに埋め込まれたスキーマIDによって示される場合、スキーマレジストリから更新または異なるスキーマを自動的に取得します。メッセージが埋め込まれたスキーマIDなしで書き込まれた場合は、すべてのメッセージを解析するために特定のスキーマIDまたはサブジェクトを指定する必要があります。

  1. トピックを選択すると、UIにトピックからのサンプル文書が表示されます。
Set data format and topic
  1. 次のステップでは、新しいClickHouseテーブルにデータを取り込むか、既存のテーブルを再利用するかを選択できます。画面の指示に従って、テーブル名、スキーマ、および設定を変更します。サンプルテーブルのリアルタイムプレビューが上部に表示されます。
Set table, schema, and settings

提供されたコントロールを使用して、高度な設定をカスタマイズすることもできます。

Set advanced controls
  1. もしくは、既存のClickHouseテーブルにデータを取り込むこともできます。この場合、UIはソースからClickHouseフィールドにフィールドをマップできるようにします。
Use an existing table
  1. 最後に、内部ClickPipesユーザーの権限を設定できます。

権限: ClickPipesは、宛先テーブルにデータを書き込むための専用ユーザーを作成します。この内部ユーザーの役割をカスタムロールまたは事前定義されたロールの一つを使用して選択できます。

  • フルアクセス: クラスタへのフルアクセス。これは、宛先テーブルでMaterialized ViewまたはDictionaryを使用する場合に有効です。
  • 宛先テーブルのみ: 宛先テーブルに対するINSERT権限のみ。
Permissions
  1. 「設定完了」をクリックすると、システムがClickPipeを登録し、要約テーブルに表示されます。
Success notice Remove notice

要約テーブルは、ClickHouse内のソースまたは宛先テーブルからサンプルデータを表示するコントロールを提供します。

View destination

また、ClickPipeを削除し、取り込みジョブの要約を表示するコントロールも提供されます。

View overview
  1. おめでとうございます! あなたは最初のClickPipeを正常に設定しました。これがストリーミングClickPipeであれば、リアルタイムでリモートデータソースからデータを継続的に取り込む状態になります。

サポートされているデータソース

名称ロゴタイプステータス説明
Apache Kafkaストリーミング安定ClickPipesを設定し、Apache KafkaからClickHouse Cloudにストリーミングデータを取り込み始めます。
Confluent Cloudストリーミング安定ConfluentとClickHouse Cloudの統合による強力な能力を解放します。
RedpandaRedpanda logoストリーミング安定ClickPipesを設定し、RedpandaからClickHouse Cloudにストリーミングデータを取り込み始めます。
AWS MSKストリーミング安定ClickPipesを設定し、AWS MSKからClickHouse Cloudにストリーミングデータを取り込み始めます。
Azure Event Hubsストリーミング安定ClickPipesを設定し、Azure Event HubsからClickHouse Cloudにストリーミングデータを取り込み始めます。
WarpStreamストリーミング安定ClickPipesを設定し、WarpStreamからClickHouse Cloudにストリーミングデータを取り込み始めます。

より多くのコネクタがClickPipesに追加される予定です。詳細については、お問い合わせください。

サポートされているデータフォーマット

サポートされているフォーマットは:

サポートされているデータタイプ

以下のClickHouseデータタイプが現在ClickPipesでサポートされています:

  • 基本数値型 - [U]Int8/16/32/64およびFloat32/64
  • 大きな整数型 - [U]Int128/256
  • 小数型
  • ブール型
  • 文字列型
  • 固定文字列型
  • 日付型、Date32
  • 日時型、DateTime64(UTCタイムゾーンのみ)
  • Enum8/Enum16
  • UUID
  • IPv4
  • IPv6
  • すべてのClickHouse LowCardinality型
  • 上記のいずれかの型(Nullableを含む)を使用するマップ
  • 上記のいずれかの型(Nullableを含む、一レベル深さのみ)を使用するタプルと配列

Avro

サポートされているAvroデータタイプ

ClickPipesは、すべてのAvroプライマリおよび複合型、及びtime-millistime-microslocal-timestamp-millislocal_timestamp-microsdurationを除くすべてのAvro論理型をサポートします。Avro record型はタプルに変換され、array型は配列に変換され、mapはマップ(文字列キーのみ)に変換されます。一般的に、ここにリストされた変換が利用可能です。ClickPipesは、型変換時にオーバーフローや精度損失をチェックしないため、Avro数値型の厳密な型マッチングを使用することをお勧めします。

Nullable型とAvroのユニオン

Avroのnullable型は、(T, null)または(null, T)のユニオンスキーマを使用して定義され、TはベースのAvro型です。スキーマ推論中に、そのようなユニオンはClickHouseの「Nullable」カラムにマップされます。ClickHouseは、Nullable(Array)Nullable(Map)、またはNullable(Tuple)タイプをサポートしていません。これらの型に対するAvroのnullユニオンは、非nullableバージョンにマップされます(Avro Record型はClickHouseの名前付きタプルにマップされます)。これらの型に対するAvroの「null」は次のように挿入されます:

  • nullのAvro配列に対して空の配列
  • nullのAvroマップに対して空のマップ
  • nullのAvroレコードに対してすべてのデフォルトまたはゼロ値を持つ名前付きタプル

ClickPipesは、現在他のAvroユニオンを含むスキーマをサポートしていません(これは、ClickHouseのVariantおよびJSONデータ型の成熟に伴って将来的に変更される可能性があります)。Avroスキーマに「非null」ユニオンが含まれている場合、ClickPipesはAvroスキーマとClickhouseカラム型の間のマッピングを計算しようとするとエラーが発生します。

Avroスキーマ管理

ClickPipesは、各メッセージ/イベントに埋め込まれたスキーマIDを使用して、設定されたスキーマレジストリからAvroスキーマを動的に取得し、適用します。スキーマの更新は自動的に検出され、処理されます。

現在、ClickPipesはConfluent Schema Registry APIを使用するスキーマレジストリとのみ互換性があります。これには、Confluent KafkaおよびCloudに加えて、Redpanda、AWS MSK、およびUpstashスキーマレジストリが含まれます。ClickPipesは現在、AWS GlueスキーマレジストリまたはAzureスキーマレジストリとは互換性がありません(近日対応予定)。

取得したAvroスキーマとClickHouseの宛先テーブル間のマッピングには、次のルールが適用されます:

  • AvroスキーマにClickHouse宛先マッピングに含まれていないフィールドが含まれている場合、そのフィールドは無視されます。
  • AvroスキーマにClickHouse宛先マッピングで定義されたフィールドが不足している場合、ClickHouseカラムは「ゼロ」値(0や空文字列など)で埋められます。DEFAULT式は、現在ClickPipesの挿入に対して評価されません(これはClickHouseサーバーのデフォルト処理に関する一時的な制限です)。
  • AvroスキーマフィールドとClickHouseカラムが互換性がない場合、その行/メッセージの挿入は失敗し、失敗はClickPipesのエラーテーブルに記録されます。いくつかの暗黙の型変換(数値型間の変換など)はサポートされていますが、すべてではありません(例えば、Avro recordフィールドをInt32 ClickHouseカラムに挿入することはできません)。

Kafka仮想カラム

次の仮想カラムは、Kafka互換のストリーミングデータソースに対してサポートされています。新しい宛先テーブルを作成する際に、Add Columnボタンを使用して仮想カラムを追加できます。

名称説明推奨データ型
_keyKafkaメッセージキー文字列
_timestampKafkaタイムスタンプ(ミリ秒精度)DateTime64(3)
_partitionKafkaパーティションInt32
_offsetKafkaオフセットInt64
_topicKafkaトピック文字列
_header_keysレコードヘッダー内のキーの並列配列Array(文字列)
_header_valuesレコードヘッダー内のヘッダーの並列配列Array(文字列)
_raw_message完全なKafkaメッセージ文字列

_raw_messageカラムは、JSONデータに対してのみ推奨されています。JSON文字列のみが必要なユースケース(ClickHouse JsonExtract*関数を使用して下流のMaterialized Viewを人口する際など)では、すべての「非仮想」カラムを削除するとClickPipesのパフォーマンスが向上する可能性があります。

制限事項

  • DEFAULTはサポートされていません。

配信セマンティクス

ClickPipes for Kafkaは、at-least-once配信セマンティクスを提供します(最も一般的に使用されるアプローチの一つとして)。配信セマンティクスに関するフィードバックをお待ちしています。 お問い合わせフォーム。正確に1回のセマンティクスが必要な場合は、公式のclickhouse-kafka-connectシンクの使用をお勧めします。

認証

Apache Kafkaプロトコルデータソースに対して、ClickPipesは、TLS暗号化を伴うSASL/PLAIN認証、ならびにSASL/SCRAM-SHA-256SASL/SCRAM-SHA-512をサポートしています。ストリーミングソース(Redpanda、MSKなど)に応じて、これらの認証メカニズムのすべてまたはサブセットが互換性に基づいて有効になります。認証要件が異なる場合は、フィードバックをお寄せください

IAM

参考

MSK ClickPipe用のIAM認証はベータ機能です。

ClickPipesは次のAWS MSK認証をサポートします。

IAMロールARNを使用してMSKブローカーへの接続を認証する場合、IAMロールには必要な権限が必要です。以下は、MSKのApache Kafka APIに必要なIAMポリシーの例です。

信頼関係の設定

IAMロールARNを使用してMSKに認証する場合、ClickHouse Cloudインスタンスとロールの間に信頼関係を追加する必要があります。

注記

ロールベースアクセスはAWSにデプロイされたClickHouse Cloudインスタンスでのみ機能します。

カスタム証明書

ClickPipes for Kafkaは、SASLおよび公的SSL/TLS証明書を使用するKafkaブローカー用のカスタム証明書のアップロードをサポートします。ClickPipeの設定のSSL証明書セクションで証明書をアップロードできます。

注記

Kラシュ、SASLとともに単一のSSL証明書のアップロードをサポートしていますが、相互TLS(mTLS)は現在サポートされていないことに注意してください。

パフォーマンス

バッチ処理

ClickPipesは、ClickHouseにデータをバッチで挿入します。これは、データベース内にあまりにも多くのパーツを作成し、クラスタ内のパフォーマンスの問題を引き起こさないようにするためです。

次のいずれかの基準が満たされると、バッチが挿入されます:

  • バッチサイズが最大サイズ(100,000行または20MB)に達したとき
  • バッチがオープン状態で最大時間(5秒)保たれたとき

レイテンシ

レイテンシ(Kafkaメッセージが生成されてからメッセージがClickHouseで利用可能になるまでの時間)は、Brokerレイテンシ、ネットワークレイテンシ、メッセージサイズ/フォーマットなど、いくつかの要因に依存します。上記のバッチ処理もレイテンシに影響します。特定の負荷で期待されるレイテンシを決定するには、特定のユースケースでテストすることを常にお勧めします。

ClickPipesはレイテンシに関して一切の保証を提供しません。具体的な低レイテンシ要件がある場合は、お問い合わせください

スケーリング

ClickPipes for Kafkaは水平方向にスケールするように設計されています。デフォルトでは、1つのコンシューマーを持つコンシューマグループを作成します。これはClickPipeの詳細ビューにあるスケーリングコントロールで変更できます。

ClickPipesは、高可用性を実現する可用性ゾーン分散アーキテクチャを提供します。このためには、少なくとも2つのコンシューマーにスケーリングする必要があります。

実行中のコンシューマーの数にかかわらず、障害耐性は設計上提供されます。コンシューマーまたはその基盤となるインフラストラクチャが失敗した場合、ClickPipeは自動的にコンシューマーを再起動し、メッセージの処理を続けます。

F.A.Q

一般

  • ClickPipes for Kafkaはどのように機能しますか?

    ClickPipesは、指定されたトピックからデータを読み取るためにKafka Consumer APIを実行する専用アーキテクチャを使用し、その後データを特定のClickHouse CloudサービスのClickHouseテーブルに挿入します。

  • ClickPipesとClickHouse Kafkaテーブルエンジンの違いは何ですか?

    Kafkaテーブルエンジンは、ClickHouse自体がKafkaに接続し、イベントをプルしてローカルに書き込む「プルモデル」を実装するClickHouseのコア機能です。

    ClickPipesは、ClickHouseサービスから独立して実行される別のクラウドサービスで、Kafka(または他のデータソース)に接続してイベントを関連するClickHouse Cloudサービスにプッシュします。この分離されたアーキテクチャにより、オペレーション上の柔軟性、明確な関心の分離、スケーラブルな取り込み、優雅な障害管理、拡張性などが実現されます。

  • ClickPipes for Kafkaを使用するための要件は何ですか?

    ClickPipes for Kafkaを使用するには、実行中のKafkaブローカーとClickPipesが有効化されたClickHouse Cloudサービスが必要です。また、ClickHouse CloudがKafkaブローカーにアクセスできるようにする必要があります。これは、Kafka側でリモート接続を許可し、Kafka設定でClickHouse Cloud Egress IPアドレスをホワイトリストに登録することで実現できます。

  • ClickPipes for KafkaはAWS PrivateLinkをサポートしていますか?

    AWS PrivateLinkはサポートされています。詳細についてはお問い合わせください

  • ClickPipes for Kafkaを使用してKafkaトピックにデータを書き込むことはできますか?

    いいえ、ClickPipes for KafkaはKafkaトピックからデータを読むために設計されており、それにデータを書き込むためではありません。Kafkaトピックにデータを書き込むには、専用のKafkaプロデューサーを使用する必要があります。

  • ClickPipesは複数のブローカーをサポートしていますか?

    はい、ブローカーが同じ過半数の一部である場合、それらを一緒に設定することができ、,で区切ることができます。

Upstash

  • ClickPipesはUpstashをサポートしていますか?

    はい。Upstash Kafka製品は、2024年9月11日に非推奨期間に入り、6か月間続きます。既存の顧客は、ClickPipesを使用して現行のUpstash Kafkaブローカーを引き続き使用できます。ClickPipesユーザーインターフェイスでの一般的なKafkaタイルを使用します。非推奨の通知前までは、既存のUpstash Kafka ClickPipesには影響はありません。非推奨期間が終了すると、ClickPipeは機能しなくなります。

  • ClickPipesはUpstashスキーマレジストリをサポートしていますか?

    いいえ。ClickPipesはUpstash Kafkaスキーマレジストリとは互換性がありません。

  • ClickPipesはUpstash QStash Workflowをサポートしていますか?

    いいえ。QStash WorkflowにKafka互換のインターフェースが導入されない限り、Kafka ClickPipesでは機能しません。

Azure EventHubs

  • Azure Event Hubs ClickPipeはKafkaインターフェースなしで動作しますか?

    いいえ。ClickPipesは、Azure Event HubsがKafkaインターフェースを有効にする必要があります。Kafkaプロトコルは、Standard、Premium、Dedicated SKUのみの価格帯でサポートされています。

  • AzureスキーマレジストリはClickPipesで動作しますか?

    いいえ。ClickPipesは現在、Event Hubsスキーマレジストリとは互換性がありません。

  • Azure Event Hubsからデータを消費するために、私のポリシーはどのような権限が必要ですか?

    トピックをリストし、イベントを消費するには、ClickPipesに与えられた共有アクセスポリシーが少なくとも「Listen」クレームを必要とします。

  • 私のEvent Hubsがデータを返さないのはなぜですか?

    あなたのClickHouseインスタンスがEvent Hubsのデプロイメントとは異なる地域または大陸にある場合、ClickPipesのオンボーディング時にタイムアウトが発生する可能性があり、Event Hubからデータを消費する際に高いレイテンシが発生する場合があります。パフォーマンスの悪影響を避けるために、ClickHouse CloudデプロイメントとAzure Event Hubsデプロイメントを近くのクラウドリージョンに配置することが最善のプラクティスとされています。

  • Azure Event Hubsのポート番号を含めるべきですか?

    はい。ClickPipesは、Kafkaインターフェース用のポート番号を:9093として含めることを期待しています。

  • ClickPipesのIPはAzure Event Hubsでも関連性がありますか?

    はい。Event Hubsインスタンスへのトラフィックを制限する場合は、文書化された静的NAT IPを追加してください。

  • 接続文字列はEvent Hub用ですか、それともEvent Hub名前空間用ですか?

    両方とも機能しますが、複数のEvent Hubsからサンプルを取得するためには、名前空間レベルでの共有アクセスポリシーを使用することをお勧めします。