メインコンテンツまでスキップ
メインコンテンツまでスキップ

Kafka

Not supported in ClickHouse Cloud
注記

ClickHouse Cloud ユーザーには、ClickPipes を使用して Kafka データを ClickHouse にストリーミングすることを推奨します。これは、高パフォーマンスの挿入をネイティブにサポートし、取り込みとクラスターリソースを独立してスケーリングできるように、関心の分離を保証します。

このエンジンは Apache Kafka で動作します。

Kafka では以下が可能です:

  • データフローの発行または購読。
  • 障害耐性のあるストレージの整理。
  • 利用可能になったストリームの処理。

テーブルの作成

必須パラメーター:

  • kafka_broker_list — ブローカーのカンマ区切りリスト(例えば、localhost:9092)。
  • kafka_topic_list — Kafka トピックのリスト。
  • kafka_group_name — Kafka コンシューマーのグループ。読み取りマージンは各グループごとに個別に追跡されます。クラスターでメッセージが重複しないようにするには、どこでも同じグループ名を使用してください。
  • kafka_format — メッセージフォーマット。SQL の FORMAT 関数と同じ表記を使用します。例:JSONEachRow。詳細については、Formats セクションを参照してください。

オプションのパラメーター:

  • kafka_security_protocol - ブローカーとの通信に使用されるプロトコル。可能な値:plaintextsslsasl_plaintextsasl_ssl
  • kafka_sasl_mechanism - 認証に使用する SASL メカニズム。可能な値:GSSAPIPLAINSCRAM-SHA-256SCRAM-SHA-512OAUTHBEARER
  • kafka_sasl_username - PLAIN および SASL-SCRAM-.. メカニズムで使用する SASL ユーザー名。
  • kafka_sasl_password - PLAIN および SASL-SCRAM-.. メカニズムで使用する SASL パスワード。
  • kafka_schema — フォーマットがスキーマ定義を必要とする場合に使用する必要があるパラメーター。たとえば、Cap'n Proto では、スキーマファイルのパスとルート schema.capnp:Message オブジェクトの名前を要求します。
  • kafka_num_consumers — テーブルごとのコンシューマーの数。1 つのコンシューマーのスループットが不十分な場合は、より多くのコンシューマーを指定してください。全コンシューマーの数はトピック内のパーティションの数を超えてはいけません。なぜなら、1 つのパーティションには 1 つのコンシューマーのみを割り当てることができ、ClickHouse がデプロイされているサーバーの物理コア数を超えてはいけないからです。デフォルト:1
  • kafka_max_block_size — ポーリングのための最大バッチサイズ(メッセージ単位)。デフォルト:max_insert_block_size
  • kafka_skip_broken_messages — スキーマと互換性のないメッセージごとの Kafka メッセージパーサーの耐性。kafka_skip_broken_messages = N の場合、エンジンはパースできない N の Kafka メッセージをスキップします(メッセージはデータの行に等しい)。デフォルト:0
  • kafka_commit_every_batch — すべての消費されたおよび処理されたバッチをコミットし、全ブロックを書き込んだ後の単一コミットを避けます。デフォルト:0
  • kafka_client_id — クライアント識別子。デフォルトは空です。
  • kafka_poll_timeout_ms — Kafka からの単一ポーリングのタイムアウト。デフォルト:stream_poll_timeout_ms
  • kafka_poll_max_batch_size — 単一の Kafka ポーリングでポーリングされる最大メッセージ数。デフォルト:max_block_size
  • kafka_flush_interval_ms — Kafka からのデータフラッシュのタイムアウト。デフォルト:stream_flush_interval_ms
  • kafka_thread_per_consumer — 各コンシューマーに独立したスレッドを提供します。有効にすると、各コンシューマーは独立してデータをフラッシュし、並行して処理します(そうでなければ、いくつかのコンシューマーからの行が1つのブロックにまとめられます)。デフォルト:0
  • kafka_handle_error_mode — Kafka エンジンのエラー処理方法。可能な値:デフォルト(メッセージのパースに失敗した場合は例外がスローされます)、ストリーム(例外メッセージと生のメッセージが仮想カラム _error_raw_message に保存されます)。
  • kafka_commit_on_select — SELECT クエリが実行されたときにメッセージをコミットします。デフォルト:false
  • kafka_max_rows_per_message — 行ベースのフォーマットの単一 Kafka メッセージで書き込まれる最大行数。デフォルト : 1

例:

テーブルを作成するための非推奨メソッド
注記

新しいプロジェクトではこの方法を使用しないでください。可能であれば、古いプロジェクトは上記の方法に移行してください。

参考

Kafka テーブルエンジンは、default value を持つカラムをサポートしていません。デフォルト値を持つカラムが必要な場合は、マテリアライズドビューのレベルで追加できます(下記を参照)。

説明

配信されたメッセージは自動的に追跡されるため、各グループの各メッセージは1 回だけカウントされます。データを二重に取得したい場合は、別のグループ名でテーブルのコピーを作成してください。

グループは柔軟で、クラスターで同期されています。たとえば、10 のトピックとクラスター内に 5 つのテーブルのコピーがある場合、各コピーは 2 つのトピックを取得します。コピーの数が変更されると、トピックは自動的にコピー間で再配分されます。このことについては、http://kafka.apache.org/intro で詳しく読むことができます。

SELECT は特にメッセージを読み取るためには便利ではありません(デバッグを除く)、なぜなら各メッセージは 1 回しか読み取れないからです。リアルタイムスレッドをマテリアライズドビューを使用して作成することがより実用的です。そのためには:

  1. エンジンを使用して Kafka コンシューマーを作成し、それをデータストリームと見なします。
  2. 必要な構造のテーブルを作成します。
  3. エンジンからデータを変換し、事前に作成されたテーブルに配置するマテリアライズドビューを作成します。

MATERIALIZED VIEW がエンジンに参加すると、バックグラウンドでデータの集計を開始します。これにより、Kafka からメッセージを継続的に受信し、SELECT を使用して必要なフォーマットに変換できます。 1 つの Kafka テーブルには、好きなだけのマテリアライズドビューを持つことができ、これらは Kafka テーブルから直接データを読み取ることはなく、新しいレコード(ブロック単位)を受け取ります。この方法で、異なる詳細レベルで複数のテーブルに書き込むことができます(グルーピング - 集約ありおよびなし)。

例:

パフォーマンスを向上させるために、受信したメッセージは max_insert_block_size のサイズのブロックにグループ化されます。ブロックが stream_flush_interval_ms ミリ秒以内に形成されなかった場合は、ブロックの完全性に関係なくデータがテーブルにフラッシュされます。

トピックデータの受信を停止するか、変換ロジックを変更するには、マテリアライズドビューを切り離します:

ALTER を使用してターゲットテーブルを変更する場合、ターゲットテーブルとビューからのデータ間の不一致を避けるために、マテリアルビューを無効にすることをお勧めします。

設定

GraphiteMergeTree と同様に、Kafka エンジンは ClickHouse 設定ファイルを使用した拡張設定をサポートしています。使用できる設定キーは、グローバル(<kafka> の下)とトピックレベル(<kafka><kafka_topic> の下)の 2 つです。グローバル設定が最初に適用され、その後トピックレベルの設定が適用されます(存在する場合)。

利用可能な設定オプションのリストについては、librdkafka configuration reference を参照してください。ClickHouse 設定では、ドットの代わりにアンダースコア(_)を使用します。たとえば、check.crcs=true<check_crcs>true</check_crcs> になります。

Kerberos サポート

Kerberos 対応 Kafka を扱うには、sasl_plaintext 値を持つ security_protocol 子要素を追加します。OS の機能によって Kerberos チケット授与チケットが取得され、キャッシュされていれば十分です。 ClickHouse はキータブファイルを使用して Kerberos 資格情報を管理できます。sasl_kerberos_service_namesasl_kerberos_keytab および sasl_kerberos_principal 子要素を考慮してください。

例:

仮想カラム

  • _topic — Kafka トピック。データ型:LowCardinality(String)
  • _key — メッセージの鍵。データ型:String
  • _offset — メッセージのオフセット。データ型:UInt64
  • _timestamp — メッセージのタイムスタンプ データ型:Nullable(DateTime)
  • _timestamp_ms — メッセージのミリ秒単位のタイムスタンプ。データ型:Nullable(DateTime64(3))
  • _partition — Kafka トピックのパーティション。データ型:UInt64
  • _headers.name — メッセージのヘッダーキーの配列。データ型:Array(String)
  • _headers.value — メッセージのヘッダー値の配列。データ型:Array(String)

kafka_handle_error_mode='stream' の場合の追加仮想カラム:

  • _raw_message - 正しく解析できなかった生メッセージ。データ型:String
  • _error - 解析に失敗した際に発生した例外メッセージ。データ型:String

注:_raw_message_error の仮想カラムは、解析中の例外の場合にのみ埋められ、メッセージが正常に解析された場合は常に空です。

データフォーマットのサポート

Kafka エンジンは、ClickHouse でサポートされているすべての formats をサポートしています。 1 つの Kafka メッセージの行数は、フォーマットが行ベースかブロックベースかによって異なります。

  • 行ベースのフォーマットの場合、1 つの Kafka メッセージの行数は kafka_max_rows_per_message を設定して制御できます。
  • ブロックベースのフォーマットの場合、ブロックを小さな部分に分割することはできませんが、1 つのブロックの行数は一般設定 max_block_size で制御できます。

提出済みオフセットを ClickHouse Keeper に保存するためのエンジン

Experimental feature. Learn more.

allow_experimental_kafka_offsets_storage_in_keeper が有効になっている場合、Kafka テーブルエンジンには 2 つの設定を指定できます:

  • kafka_keeper_path は、ClickHouse Keeper 内のテーブルのパスを指定します
  • kafka_replica_name は、ClickHouse Keeper 内のレプリカ名を指定します

どちらの設定も指定するか、どちらも指定しない必要があります。どちらの設定も指定された場合は、新しい実験的な Kafka エンジンが使用されます。この新しいエンジンは、コミットされたオフセットを Kafka に保存することに依存せず、ClickHouse Keeper に保存します。オフセットを Kafka にコミットしようとはしますが、テーブルが作成されるときにのみそのオフセットに依存します。他のすべての状況(テーブルが再起動されたり、エラーから回復された場合)では、ClickHouse Keeper に保存されたオフセットがメッセージの消費を続けるためのオフセットとして使用されます。コミットされたオフセットのほかに、最後のバッチで消費されたメッセージの数も保存されるので、挿入が失敗した場合には、必要に応じて同じ数のメッセージが消費され、重複排除が可能になります。

例:

または、uuid および replica マクロを ReplicatedMergeTree と同様に利用する:

既知の制限

新しいエンジンは実験的であるため、まだ本番環境には対応していません。実装の既知の制限がいくつかあります:

  • 最大の制限は、エンジンが直接読み取りをサポートしていないことです。マテリアライズドビューを使用してエンジンから読み取ることと、エンジンに書き込むことは機能しますが、直接読み取りは機能しません。その結果、すべての直接 SELECT クエリは失敗します。
  • テーブルを迅速に削除して再作成することや、異なるエンジンに同じ ClickHouse Keeper パスを指定することは問題を引き起こす可能性があります。ベストプラクティスとして、kafka_keeper_path{uuid} を使用して衝突するパスを避けることができます。
  • 繰り返し可能な読み取りを行うには、メッセージを単一スレッドの複数パーティションから消費することはできません。これに対して、Kafka コンシューマーは定期的にポーリングして生存状態を維持する必要があります。これらの 2 つの目標の結果として、kafka_thread_per_consumer が有効な場合にのみ複数のコンシューマーの作成を許可することにしました。そうでなければ、コンシューマーを定期的にポーリングする際に問題を回避することが非常に複雑になります。
  • 新しいストレージエンジンによって作成されたコンシューマーは system.kafka_consumers テーブルには表示されません。

関連情報