Kafka
ClickHouse Cloud ユーザーには、ClickPipes を使用して Kafka データを ClickHouse にストリーミングすることを推奨します。これは、高パフォーマンスの挿入をネイティブにサポートし、取り込みとクラスターリソースを独立してスケーリングできるように、関心の分離を保証します。
このエンジンは Apache Kafka で動作します。
Kafka では以下が可能です:
- データフローの発行または購読。
- 障害耐性のあるストレージの整理。
- 利用可能になったストリームの処理。
テーブルの作成
必須パラメーター:
kafka_broker_list
— ブローカーのカンマ区切りリスト(例えば、localhost:9092
)。kafka_topic_list
— Kafka トピックのリスト。kafka_group_name
— Kafka コンシューマーのグループ。読み取りマージンは各グループごとに個別に追跡されます。クラスターでメッセージが重複しないようにするには、どこでも同じグループ名を使用してください。kafka_format
— メッセージフォーマット。SQL のFORMAT
関数と同じ表記を使用します。例:JSONEachRow
。詳細については、Formats セクションを参照してください。
オプションのパラメーター:
kafka_security_protocol
- ブローカーとの通信に使用されるプロトコル。可能な値:plaintext
、ssl
、sasl_plaintext
、sasl_ssl
。kafka_sasl_mechanism
- 認証に使用する SASL メカニズム。可能な値:GSSAPI
、PLAIN
、SCRAM-SHA-256
、SCRAM-SHA-512
、OAUTHBEARER
。kafka_sasl_username
-PLAIN
およびSASL-SCRAM-..
メカニズムで使用する SASL ユーザー名。kafka_sasl_password
-PLAIN
およびSASL-SCRAM-..
メカニズムで使用する SASL パスワード。kafka_schema
— フォーマットがスキーマ定義を必要とする場合に使用する必要があるパラメーター。たとえば、Cap'n Proto では、スキーマファイルのパスとルートschema.capnp:Message
オブジェクトの名前を要求します。kafka_num_consumers
— テーブルごとのコンシューマーの数。1 つのコンシューマーのスループットが不十分な場合は、より多くのコンシューマーを指定してください。全コンシューマーの数はトピック内のパーティションの数を超えてはいけません。なぜなら、1 つのパーティションには 1 つのコンシューマーのみを割り当てることができ、ClickHouse がデプロイされているサーバーの物理コア数を超えてはいけないからです。デフォルト:1
。kafka_max_block_size
— ポーリングのための最大バッチサイズ(メッセージ単位)。デフォルト:max_insert_block_size。kafka_skip_broken_messages
— スキーマと互換性のないメッセージごとの Kafka メッセージパーサーの耐性。kafka_skip_broken_messages = N
の場合、エンジンはパースできない N の Kafka メッセージをスキップします(メッセージはデータの行に等しい)。デフォルト:0
。kafka_commit_every_batch
— すべての消費されたおよび処理されたバッチをコミットし、全ブロックを書き込んだ後の単一コミットを避けます。デフォルト:0
。kafka_client_id
— クライアント識別子。デフォルトは空です。kafka_poll_timeout_ms
— Kafka からの単一ポーリングのタイムアウト。デフォルト:stream_poll_timeout_ms。kafka_poll_max_batch_size
— 単一の Kafka ポーリングでポーリングされる最大メッセージ数。デフォルト:max_block_size。kafka_flush_interval_ms
— Kafka からのデータフラッシュのタイムアウト。デフォルト:stream_flush_interval_ms。kafka_thread_per_consumer
— 各コンシューマーに独立したスレッドを提供します。有効にすると、各コンシューマーは独立してデータをフラッシュし、並行して処理します(そうでなければ、いくつかのコンシューマーからの行が1つのブロックにまとめられます)。デフォルト:0
。kafka_handle_error_mode
— Kafka エンジンのエラー処理方法。可能な値:デフォルト(メッセージのパースに失敗した場合は例外がスローされます)、ストリーム(例外メッセージと生のメッセージが仮想カラム_error
と_raw_message
に保存されます)。kafka_commit_on_select
— SELECT クエリが実行されたときにメッセージをコミットします。デフォルト:false
。kafka_max_rows_per_message
— 行ベースのフォーマットの単一 Kafka メッセージで書き込まれる最大行数。デフォルト :1
。
例:
テーブルを作成するための非推奨メソッド
新しいプロジェクトではこの方法を使用しないでください。可能であれば、古いプロジェクトは上記の方法に移行してください。
Kafka テーブルエンジンは、default value を持つカラムをサポートしていません。デフォルト値を持つカラムが必要な場合は、マテリアライズドビューのレベルで追加できます(下記を参照)。
説明
配信されたメッセージは自動的に追跡されるため、各グループの各メッセージは1 回だけカウントされます。データを二重に取得したい場合は、別のグループ名でテーブルのコピーを作成してください。
グループは柔軟で、クラスターで同期されています。たとえば、10 のトピックとクラスター内に 5 つのテーブルのコピーがある場合、各コピーは 2 つのトピックを取得します。コピーの数が変更されると、トピックは自動的にコピー間で再配分されます。このことについては、http://kafka.apache.org/intro で詳しく読むことができます。
SELECT
は特にメッセージを読み取るためには便利ではありません(デバッグを除く)、なぜなら各メッセージは 1 回しか読み取れないからです。リアルタイムスレッドをマテリアライズドビューを使用して作成することがより実用的です。そのためには:
- エンジンを使用して Kafka コンシューマーを作成し、それをデータストリームと見なします。
- 必要な構造のテーブルを作成します。
- エンジンからデータを変換し、事前に作成されたテーブルに配置するマテリアライズドビューを作成します。
MATERIALIZED VIEW
がエンジンに参加すると、バックグラウンドでデータの集計を開始します。これにより、Kafka からメッセージを継続的に受信し、SELECT
を使用して必要なフォーマットに変換できます。
1 つの Kafka テーブルには、好きなだけのマテリアライズドビューを持つことができ、これらは Kafka テーブルから直接データを読み取ることはなく、新しいレコード(ブロック単位)を受け取ります。この方法で、異なる詳細レベルで複数のテーブルに書き込むことができます(グルーピング - 集約ありおよびなし)。
例:
パフォーマンスを向上させるために、受信したメッセージは max_insert_block_size のサイズのブロックにグループ化されます。ブロックが stream_flush_interval_ms ミリ秒以内に形成されなかった場合は、ブロックの完全性に関係なくデータがテーブルにフラッシュされます。
トピックデータの受信を停止するか、変換ロジックを変更するには、マテリアライズドビューを切り離します:
ALTER
を使用してターゲットテーブルを変更する場合、ターゲットテーブルとビューからのデータ間の不一致を避けるために、マテリアルビューを無効にすることをお勧めします。
設定
GraphiteMergeTree と同様に、Kafka エンジンは ClickHouse 設定ファイルを使用した拡張設定をサポートしています。使用できる設定キーは、グローバル(<kafka>
の下)とトピックレベル(<kafka><kafka_topic>
の下)の 2 つです。グローバル設定が最初に適用され、その後トピックレベルの設定が適用されます(存在する場合)。
利用可能な設定オプションのリストについては、librdkafka configuration reference を参照してください。ClickHouse 設定では、ドットの代わりにアンダースコア(_
)を使用します。たとえば、check.crcs=true
は <check_crcs>true</check_crcs>
になります。
Kerberos サポート
Kerberos 対応 Kafka を扱うには、sasl_plaintext
値を持つ security_protocol
子要素を追加します。OS の機能によって Kerberos チケット授与チケットが取得され、キャッシュされていれば十分です。
ClickHouse はキータブファイルを使用して Kerberos 資格情報を管理できます。sasl_kerberos_service_name
、sasl_kerberos_keytab
および sasl_kerberos_principal
子要素を考慮してください。
例:
仮想カラム
_topic
— Kafka トピック。データ型:LowCardinality(String)
。_key
— メッセージの鍵。データ型:String
。_offset
— メッセージのオフセット。データ型:UInt64
。_timestamp
— メッセージのタイムスタンプ データ型:Nullable(DateTime)
。_timestamp_ms
— メッセージのミリ秒単位のタイムスタンプ。データ型:Nullable(DateTime64(3))
。_partition
— Kafka トピックのパーティション。データ型:UInt64
。_headers.name
— メッセージのヘッダーキーの配列。データ型:Array(String)
。_headers.value
— メッセージのヘッダー値の配列。データ型:Array(String)
。
kafka_handle_error_mode='stream'
の場合の追加仮想カラム:
_raw_message
- 正しく解析できなかった生メッセージ。データ型:String
。_error
- 解析に失敗した際に発生した例外メッセージ。データ型:String
。
注:_raw_message
と _error
の仮想カラムは、解析中の例外の場合にのみ埋められ、メッセージが正常に解析された場合は常に空です。
データフォーマットのサポート
Kafka エンジンは、ClickHouse でサポートされているすべての formats をサポートしています。 1 つの Kafka メッセージの行数は、フォーマットが行ベースかブロックベースかによって異なります。
- 行ベースのフォーマットの場合、1 つの Kafka メッセージの行数は
kafka_max_rows_per_message
を設定して制御できます。 - ブロックベースのフォーマットの場合、ブロックを小さな部分に分割することはできませんが、1 つのブロックの行数は一般設定 max_block_size で制御できます。
提出済みオフセットを ClickHouse Keeper に保存するためのエンジン
allow_experimental_kafka_offsets_storage_in_keeper
が有効になっている場合、Kafka テーブルエンジンには 2 つの設定を指定できます:
kafka_keeper_path
は、ClickHouse Keeper 内のテーブルのパスを指定しますkafka_replica_name
は、ClickHouse Keeper 内のレプリカ名を指定します
どちらの設定も指定するか、どちらも指定しない必要があります。どちらの設定も指定された場合は、新しい実験的な Kafka エンジンが使用されます。この新しいエンジンは、コミットされたオフセットを Kafka に保存することに依存せず、ClickHouse Keeper に保存します。オフセットを Kafka にコミットしようとはしますが、テーブルが作成されるときにのみそのオフセットに依存します。他のすべての状況(テーブルが再起動されたり、エラーから回復された場合)では、ClickHouse Keeper に保存されたオフセットがメッセージの消費を続けるためのオフセットとして使用されます。コミットされたオフセットのほかに、最後のバッチで消費されたメッセージの数も保存されるので、挿入が失敗した場合には、必要に応じて同じ数のメッセージが消費され、重複排除が可能になります。
例:
または、uuid
および replica
マクロを ReplicatedMergeTree と同様に利用する:
既知の制限
新しいエンジンは実験的であるため、まだ本番環境には対応していません。実装の既知の制限がいくつかあります:
- 最大の制限は、エンジンが直接読み取りをサポートしていないことです。マテリアライズドビューを使用してエンジンから読み取ることと、エンジンに書き込むことは機能しますが、直接読み取りは機能しません。その結果、すべての直接
SELECT
クエリは失敗します。 - テーブルを迅速に削除して再作成することや、異なるエンジンに同じ ClickHouse Keeper パスを指定することは問題を引き起こす可能性があります。ベストプラクティスとして、
kafka_keeper_path
に{uuid}
を使用して衝突するパスを避けることができます。 - 繰り返し可能な読み取りを行うには、メッセージを単一スレッドの複数パーティションから消費することはできません。これに対して、Kafka コンシューマーは定期的にポーリングして生存状態を維持する必要があります。これらの 2 つの目標の結果として、
kafka_thread_per_consumer
が有効な場合にのみ複数のコンシューマーの作成を許可することにしました。そうでなければ、コンシューマーを定期的にポーリングする際に問題を回避することが非常に複雑になります。 - 新しいストレージエンジンによって作成されたコンシューマーは
system.kafka_consumers
テーブルには表示されません。
関連情報