メインコンテンツまでスキップ
メインコンテンツまでスキップ

Kafka

Not supported in ClickHouse Cloud
注記

ClickHouse Cloudユーザーは、KafkaデータをClickHouseにストリーミングするためにClickPipesの使用を推奨します。これは高パフォーマンスの挿入をネイティブにサポートし、取り込みとクラスターリソースを独立してスケールできるという分離の原則を保証します。

このエンジンはApache Kafkaと連携します。

Kafkaを使用すると:

  • データフローの発行または購読ができます。
  • 耐障害性のあるストレージを組織化できます。
  • ストリームが利用可能になると同時に処理ができます。

テーブルの作成

必要なパラメータ:

  • kafka_broker_list — ブローカーのカンマ区切りリスト(例えば、 localhost:9092)。
  • kafka_topic_list — Kafkaトピックのリスト。
  • kafka_group_name — Kafkaコンシューマのグループ。読み取りマージンは各グループごとに別々に追跡されます。メッセージの重複を避けたい場合は、同じグループ名を使用してください。
  • kafka_format — メッセージ形式。SQLの FORMAT 関数と同じ表記法を使用します。例えば、JSONEachRow。詳細については、フォーマットセクションを参照してください。

オプショナルパラメータ:

  • kafka_schema — 形式がスキーマ定義を必要とする場合に使用されるパラメータ。例えば、Cap'n Protoは、スキーマファイルへのパスとルートsсhema.capnp:Messageオブジェクトの名前を必要とします。
  • kafka_num_consumers — テーブルごとのコンシューマの数。1つのコンシューマのスループットが不十分な場合は、より多くのコンシューマを指定してください。総コンシューマ数はトピックのパーティション数を超えてはならず、ClickHouseが展開されているサーバー上の物理コア数を超えてはなりません。デフォルト: 1
  • kafka_max_block_size — ポールの最大バッチサイズ(メッセージ数)。デフォルト: max_insert_block_size
  • kafka_skip_broken_messages — ブロック内のスキーマ不適合メッセージに対するKafkaメッセージパーサの許容度。kafka_skip_broken_messages = Nの場合、エンジンは解析できないNのKafkaメッセージをスキップします(メッセージはデータの行に相当します)。デフォルト: 0
  • kafka_commit_every_batch — 全体のブロックを書き込んだ後の単一コミットではなく、消費されたおよび処理されたバッチごとにコミットします。デフォルト: 0
  • kafka_client_id — クライアント識別子。デフォルトは空です。
  • kafka_poll_timeout_ms — Kafkaからの単一ポールのタイムアウト。デフォルト: stream_poll_timeout_ms
  • kafka_poll_max_batch_size — 単一のKafkaポールでポーリングするメッセージの最大数。デフォルト: max_block_size
  • kafka_flush_interval_ms — Kafkaからデータをフラッシュするためのタイムアウト。デフォルト: stream_flush_interval_ms
  • kafka_thread_per_consumer — 各コンシューマに独立したスレッドを提供します。有効にすると、各コンシューマは独立して並列でデータをフラッシュします(そうでない場合は、複数のコンシューマの行が一つのブロックに圧縮されます)。デフォルト: 0
  • kafka_handle_error_mode — Kafkaエンジンのエラー処理方法。可能な値: デフォルト(メッセージの解析に失敗した場合に例外がスローされます)、ストリーム(例外メッセージと生のメッセージが仮想カラム_error_raw_messageに保存されます)。
  • kafka_commit_on_select — SELECTクエリが実行されたときにメッセージをコミットします。デフォルト: false
  • kafka_max_rows_per_message — 行ベースの形式の1つのKafkaメッセージに書き込まれる最大行数。デフォルト: 1

例:

テーブルを作成するための非推奨メソッド
注記

新しいプロジェクトではこの方法を使用しないでください。可能であれば、古いプロジェクトを上記で説明した方法に切り替えてください。

参考

Kafkaテーブルエンジンは、デフォルト値を持つカラムをサポートしていません。デフォルト値を持つカラムが必要な場合は、マテリアライズドビューのレベルで追加できます(以下を参照)。

説明

配信されたメッセージは自動的に追跡されるため、グループ内の各メッセージは1回だけカウントされます。データを2回取得したい場合は、別のグループ名でテーブルのコピーを作成してください。

グループは柔軟でクラスター内で同期されます。たとえば、10トピックとクラスター内の5つのテーブルのコピーがある場合、各コピーは2つのトピックを取得します。コピーの数が変更されると、トピックは自動的にコピー間で再分配されます。これについては、http://kafka.apache.org/introで詳しく読むことができます。

SELECTはメッセージを読み取るために特に便利ではありません(デバッグを除いて)、なぜなら各メッセージは一度しか読むことができないからです。マテリアライズドビューを使用してリアルタイムスレッドを作成する方が実用的です。そのために:

  1. エンジンを使用してKafkaコンシューマを作成し、それをデータストリームと考えます。
  2. 希望の構造でテーブルを作成します。
  3. エンジンからデータを変換し、事前に作成したテーブルに入れるマテリアライズドビューを作成します。

MATERIALIZED VIEWがエンジンに結合すると、バックグラウンドでデータを収集し始めます。これにより、Kafkaからメッセージを継続して受信し、SELECTを使用して必要な形式に変換できます。 1つのKafkaテーブルには、好きなだけのマテリアライズドビューを持つことができ、これらはKafkaテーブルから直接データを読み取るのではなく、新しいレコード(ブロックごとに)を受け取ります。このように、異なる詳細レベル(集計と非集計)の複数のテーブルに書き込むことができます。

例:

パフォーマンスを向上させるために、受信したメッセージはmax_insert_block_sizeサイズのブロックにグループ化されます。ブロックがstream_flush_interval_msミリ秒内に形成されなかった場合、データはブロックの完全性に関係なくテーブルにフラッシュされます。

トピックデータの受信を停止するか、変換ロジックを変更するには、マテリアライズドビューを切り離します:

ALTERを使用してターゲットテーブルを変更したい場合は、ターゲットテーブルとビューからのデータの不一致を避けるためにマテリアルビューを無効にすることをお勧めします。

設定

GraphiteMergeTreeと同様に、KafkaエンジンはClickHouse設定ファイルを使用した拡張設定をサポートしています。使用できる設定キーは2つあります:グローバル(<kafka>の下)とトピックレベル(<kafka><kafka_topic>の下)。グローバル設定は最初に適用され、その後トピックレベルの設定が適用されます(存在する場合)。

利用可能な設定オプションのリストについては、librdkafka設定リファレンスを参照してください。ClickHouse設定では、ドットの代わりにアンダースコア(_)を使用します。例えば、check.crcs=true<check_crcs>true</check_crcs>となります。

Kerberosサポート

Kerberos対応Kafkaを使用するには、security_protocol子要素をsasl_plaintext値で追加します。OS機能によってチケット授与チケットが取得され、キャッシュされているだけで十分です。 ClickHouseは、keytabファイルを使用してKerberos資格情報を維持できます。sasl_kerberos_service_namesasl_kerberos_keytab、およびsasl_kerberos_principal子要素を考慮してください。

例:

仮想カラム

  • _topic — Kafkaトピック。データ型: LowCardinality(String)
  • _key — メッセージのキー。データ型: String
  • _offset — メッセージのオフセット。データ型: UInt64
  • _timestamp — メッセージのタイムスタンプ。データ型: Nullable(DateTime)
  • _timestamp_ms — メッセージのミリ秒単位のタイムスタンプ。データ型: Nullable(DateTime64(3))
  • _partition — Kafkaトピックのパーティション。データ型: UInt64
  • _headers.name — メッセージのヘッダーキーの配列。データ型: Array(String)
  • _headers.value — メッセージのヘッダー値の配列。データ型: Array(String)

kafka_handle_error_mode='stream'の場合の追加の仮想カラム:

  • _raw_message - 成功裏に解析できなかった生メッセージ。データ型: String
  • _error - 解析に失敗した際の例外メッセージ。データ型: String

注意: _raw_messageおよび_errorの仮想カラムは、解析中に例外が発生した場合にのみ埋め込まれ、メッセージが正常に解析された場合は常に空です。

データ形式のサポート

Kafkaエンジンは、ClickHouseでサポートされているすべての形式をサポートしています。 1つのKafkaメッセージの行数は、形式が行ベースかブロックベースかによって異なります:

  • 行ベースの形式の場合、1つのKafkaメッセージの行数はkafka_max_rows_per_messageを設定することで制御できます。
  • ブロックベースの形式の場合、ブロックを小さな部分に分割することはできませんが、1つのブロックの行数は一般設定max_block_sizeで制御できます。

ClickHouse Keeperにコミットされたオフセットを保存するエンジン

Experimental feature. Learn more.

allow_experimental_kafka_offsets_storage_in_keeperが有効になっている場合、Kafkaテーブルエンジンに指定できる2つの追加設定があります:

  • kafka_keeper_pathは、ClickHouse Keeper内のテーブルのパスを指定します
  • kafka_replica_nameは、ClickHouse Keeper内のレプリカ名を指定します

これらの設定は、両方とも指定する必要があるか、どちらも指定しない必要があります。両方が指定された場合、新しい実験的なKafkaエンジンが使用されます。この新しいエンジンは、コミットされたオフセットをKafkaに保存する必要はなく、ClickHouse Keeperに保存します。Kafkaにオフセットをコミットしようとしますが、テーブルが作成されるときにのみそのオフセットに依存します。それ以外の状況(テーブルが再起動またはエラーから復旧した場合)では、ClickHouse Keeperに保存されたオフセットを使用してメッセージの消費を続行します。コミットされたオフセットに加えて、最後のバッチで消費されたメッセージ数も保存されるため、挿入が失敗した場合、同じ数のメッセージが消費され、必要に応じて重複排除が可能になります。

例:

また、uuidおよびreplicaのマクロをReplicatedMergeTreeに似たように利用できます:

既知の制限

新しいエンジンは実験的であるため、まだ本番環境での使用には適していません。実装にはいくつかの既知の制限があります:

  • 最大の制限は、エンジンが直接読み取りをサポートしていないことです。マテリアライズドビューを使用してエンジンから読み取ることと、エンジンに書き込むことは可能ですが、直接読み取りはできません。その結果、すべての直接SELECTクエリは失敗します。
  • テーブルを迅速に削除および再作成したり、異なるエンジンに同じClickHouse Keeperパスを指定すると問題が発生する可能性があります。ベストプラクティスとして、kafka_keeper_path{uuid}を使用してパスの衝突を避けることができます。
  • 再現可能な読み取りを行うためには、メッセージは単一のスレッドで複数のパーティションから消費できません。一方で、Kafkaのコンシューマは定期的にポーリングされる必要があります。これらの2つの目標の結果として、kafka_thread_per_consumerが有効になっている場合のみ複数のコンシューマを作成することを許可しました。そうでなければ、コンシューマを定期的にポーリングする際に問題を避けるのが複雑すぎるからです。
  • 新しいストレージエンジンによって作成されたコンシューマは、system.kafka_consumersテーブルには表示されません。

関連情報