RabbitMQ エンジン
このエンジンは、ClickHouse と RabbitMQ を統合することを可能にします。
RabbitMQ
を利用すると:
- データフローを発行または購読できます。
- 流れが利用可能になると、それを処理できます。
テーブルの作成
必要なパラメータ:
rabbitmq_host_port
– host:port (例:localhost:5672
)。rabbitmq_exchange_name
– RabbitMQ のエクスチェンジ名。rabbitmq_format
– メッセージフォーマット。SQL のFORMAT
関数と同じ記法を使用します。例えば、JSONEachRow
。詳細については、Formats セクションを参照してください。
オプションのパラメータ:
rabbitmq_exchange_type
– RabbitMQ のエクスチェンジのタイプ:direct
,fanout
,topic
,headers
,consistent_hash
。デフォルト:fanout
。rabbitmq_routing_key_list
– カンマ区切りのルーティングキーのリスト。rabbitmq_schema
– フォーマットがスキーマ定義を必要とする場合に使用するパラメータ。例えば、Cap'n Proto はスキーマファイルのパスとルートのschema.capnp:Message
オブジェクトの名前を必要とします。rabbitmq_num_consumers
– テーブルごとの消費者の数。一つの消費者のスループットが不足している場合はより多くの消費者を指定してください。デフォルト:1
。rabbitmq_num_queues
– キューの総数。この数を増やすことでパフォーマンスが大幅に向上する可能性があります。デフォルト:1
。rabbitmq_queue_base
- キュー名のヒントを指定します。この設定の使用事例は以下に記載されています。rabbitmq_deadletter_exchange
- デッドレターエクスチェンジ の名前を指定します。このエクスチェンジ名で別のテーブルを作成し、メッセージを収集できます。デフォルトではデッドレターエクスチェンジは指定されていません。rabbitmq_persistent
- 1 (true) に設定すると、挿入クエリの配信モードが 2 に設定されます(メッセージを 'persistent' とマークします)。デフォルト:0
。rabbitmq_skip_broken_messages
– スキーマ不適合のメッセージのブロックごとの RabbitMQ メッセージパーサーの許容度。rabbitmq_skip_broken_messages = N
の場合、エンジンは解析できない N の RabbitMQ メッセージをスキップします(メッセージはデータの行に相当します)。デフォルト:0
。rabbitmq_max_block_size
- RabbitMQ からデータをフラッシュする前に収集される行の数。デフォルト:max_insert_block_size。rabbitmq_flush_interval_ms
- RabbitMQ からデータをフラッシュするためのタイムアウト。デフォルト:stream_flush_interval_ms。rabbitmq_queue_settings_list
- キュー作成時に RabbitMQ 設定を設定するために使用されます。利用可能な設定:x-max-length
,x-max-length-bytes
,x-message-ttl
,x-expires
,x-priority
,x-max-priority
,x-overflow
,x-dead-letter-exchange
,x-queue-type
。キューのdurable
設定は自動的に有効になります。rabbitmq_address
- 接続のためのアドレス。この設定またはrabbitmq_host_port
を使用します。rabbitmq_vhost
- RabbitMQ の vhost。デフォルト:'\''
。rabbitmq_queue_consume
- ユーザー定義のキューを使用し、RabbitMQ の設定を行わない(エクスチェンジ、キュー、バインディングを宣言しない)。デフォルト:false
。rabbitmq_username
- RabbitMQ のユーザー名。rabbitmq_password
- RabbitMQ のパスワード。reject_unhandled_messages
- エラーが発生した場合にメッセージを拒否します(RabbitMQ に否定確認を送信します)。この設定は、rabbitmq_queue_settings_list
にx-dead-letter-exchange
が定義されている場合、自動的に有効になります。rabbitmq_commit_on_select
- セレクトクエリが実行されたときにメッセージをコミットします。デフォルト:false
。rabbitmq_max_rows_per_message
— 行ベースフォーマットにおける一つの RabbitMQ メッセージあたりの最大行数。デフォルト :1
。rabbitmq_empty_queue_backoff_start
— RabbitMQ キューが空のときにリードを再スケジュールするための開始バックオフポイント。rabbitmq_empty_queue_backoff_end
— RabbitMQ キューが空のときにリードを再スケジュールするための終了バックオフポイント。rabbitmq_handle_error_mode
— RabbitMQ エンジンのエラー処理方法。可能な値:default(メッセージの解析に失敗した場合に例外がスローされる)、stream(例外メッセージと生のメッセージが仮想カラム_error
と_raw_message
に保存される)。
- SSL 接続:
rabbitmq_secure = 1
または接続アドレスに amqps
を使用します: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'
。
使用されるライブラリのデフォルトの動作は、生成された TLS 接続が十分に安全であることを確認しないことです。証明書が期限切れ、自己署名、存在しない、または無効である場合でも、接続は単に許可されます。証明書の厳格なチェックは、将来的に実装される可能性があります。
また、rabbitmq 関連の設定と一緒にフォーマット設定を追加することもできます。
例:
RabbitMQ サーバーの設定は、ClickHouse の設定ファイルを使用して追加する必要があります。
必要な設定:
追加の設定:
説明
SELECT
はメッセージを読むためには特に有用ではありません(デバッグを除く)、なぜなら各メッセージは一度しか読み取れないからです。リアルタイムスレッドを作成することがより実用的です。それには、materialized views を使用します。そのためには:
- エンジンを利用して RabbitMQ のコンシューマーを作成し、それをデータストリームとみなします。
- 希望する構造を持つテーブルを作成します。
- エンジンからデータを変換し、以前に作成したテーブルに挿入する Materialized View を作成します。
MATERIALIZED VIEW
がエンジンと結合すると、バックグラウンドでデータの収集を開始します。これにより、RabbitMQ からメッセージを継続的に受信し、SELECT
を使用して必要なフォーマットに変換できます。
一つの RabbitMQ テーブルは、好きなだけの Materialized View を持つことができます。
データは rabbitmq_exchange_type
と指定された rabbitmq_routing_key_list
に基づいてチャネルされることがあります。
テーブルごとにエクスチェンジは 1 つまでしか存在できません。1 つのエクスチェンジは複数のテーブル間で共有でき、複数のテーブルへのルーティングを同時に可能にします。
エクスチェンジタイプのオプション:
direct
- ルーティングはキーの正確な一致に基づいています。例:テーブルキーリスト:key1,key2,key3,key4,key5
、メッセージキーはそれらのいずれかに等しいことができます。fanout
- キーに関わらず、すべてのテーブルにルーティング(エクスチェンジ名が同じ場合)。topic
- ルーティングはドットで区切られたキーのパターンに基づいています。例:*.logs
,records.*.*.2020
,*.2018,*.2019,*.2020
。headers
- ルーティングはkey=value
の一致に基づき、設定x-match=all
またはx-match=any
があります。例:テーブルキーリスト:x-match=all,format=logs,type=report,year=2020
。consistent_hash
- データはすべてのバウンドテーブル間で均等に分配されます(エクスチェンジ名が同じ場合)。このエクスチェンジタイプは RabbitMQ プラグインrabbitmq-plugins enable rabbitmq_consistent_hash_exchange
を使って有効化する必要があります。
rabbitmq_queue_base
を設定することで次のようなケースで使用できます:
- 異なるテーブルがキューを共有できるようにし、複数の消費者が同じキューに登録できるようにします。これによりパフォーマンスが向上します。
rabbitmq_num_consumers
および/またはrabbitmq_num_queues
設定を使用する場合、これらのパラメータが同じであればキューが正確に一致します。 - 全てのメッセージが正常に消費されなかった場合に、特定の耐久性キューからの読み取りを復元できるようにします。特定のキューからの消費を再開するには、その名前を
rabbitmq_queue_base
設定に設定し、rabbitmq_num_consumers
およびrabbitmq_num_queues
を指定しないでください(デフォルトは 1)。特定のテーブルに宣言された全てのキューからの消費を再開したい場合は、同じ設定:rabbitmq_queue_base
,rabbitmq_num_consumers
,rabbitmq_num_queues
を指定してください。デフォルトでは、キュー名はテーブルに固有のものになります。 - キューが耐久性であり、自動的に削除されないため、再利用できます。(RabbitMQ CLI ツールを使用して削除できます。)
パフォーマンスを向上させるため、受信したメッセージは max_insert_block_size のサイズのブロックにグループ化されます。ブロックが stream_flush_interval_ms ミリ秒以内で形成されなかった場合、データはブロックの完全性に関係なく、テーブルにフラッシュされます。
rabbitmq_num_consumers
および/または rabbitmq_num_queues
設定が rabbitmq_exchange_type
とともに指定された場合:
rabbitmq-consistent-hash-exchange
プラグインを有効にする必要があります。- 発行されたメッセージの
message_id
プロパティを指定する必要があります(各メッセージ/バッチに対して一意)。
挿入クエリには、各発行されたメッセージに対して追加されるメッセージメタデータがあります: messageID
と republished
フラグ(再発行された場合は true) - メッセージヘッダーを介してアクセスできます。
挿入と Materialized View に同じテーブルを使用しないでください。
例:
仮想カラム
_exchange_name
- RabbitMQ エクスチェンジ名。データ型:String
。_channel_id
- メッセージを受信したコンシューマーが宣言された ChannelID。データ型:String
。_delivery_tag
- 受信したメッセージの DeliveryTag。チャネルごとにスコープが設定されています。データ型:UInt64
。_redelivered
- メッセージのredelivered
フラグ。データ型:UInt8
。_message_id
- 受信したメッセージの messageID;発行時に設定されていれば非空です。データ型:String
。_timestamp
- 受信したメッセージのタイムスタンプ;発行時に設定されていれば非空です。データ型:UInt64
。
kafka_handle_error_mode='stream'
の場合の追加の仮想カラム:
_raw_message
- 正しく解析できなかった生のメッセージ。データ型:Nullable(String)
。_error
- 解析に失敗したときに発生した例外メッセージ。データ型:Nullable(String)
。
注意: _raw_message
と _error
の仮想カラムは、解析中に例外が発生した場合のみ埋められ、メッセージが正常に解析された場合は常に NULL
です。
注意点
デフォルトカラム式(DEFAULT
、MATERIALIZED
、ALIAS
など)をテーブル定義に指定することができますが、これらは無視されます。その代わり、カラムはそれぞれの型のデフォルト値で埋められます。
データフォーマットのサポート
RabbitMQ エンジンは、ClickHouse でサポートされているすべての フォーマット をサポートしています。 一つの RabbitMQ メッセージ内の行数は、フォーマットが行ベースかブロックベースかに依存します:
- 行ベースフォーマットの場合、一つの RabbitMQ メッセージ内の行数は
rabbitmq_max_rows_per_message
を設定することで制御できます。 - ブロックベースフォーマットの場合、ブロックを小さな部分に分割することはできませんが、ブロック内の行数は一般設定 max_block_size によって制御できます。