NATSエンジン
このエンジンは、NATSとClickHouseを統合することを可能にします。
NATS
を使用すると:
- メッセージのサブジェクトに発行またはサブスクライブできます。
- 新しいメッセージが利用可能になると処理できます。
テーブルの作成
必要なパラメーター:
nats_url
– host:port (例:localhost:5672
)..nats_subjects
– NATSテーブルがサブスクライブ/発行するサブジェクトのリスト。ワイルドカードサブジェクトのようなfoo.*.bar
やbaz.>
もサポート。nats_format
– メッセージ形式。SQLのFORMAT
関数と同様の表記法を使用します。例:JSONEachRow
。詳細については、フォーマットセクションを参照してください。
オプションのパラメーター:
nats_schema
– フォーマットがスキーマ定義を要求する場合に使用する必要があるパラメーター。例えば、Cap'n Protoはスキーマファイルのパスとルートschema.capnp:Message
オブジェクトの名前を要求します。nats_num_consumers
– テーブルごとの消費者の数。デフォルト:1
。1つの消費者のスループットが不十分な場合は、より多くの消費者を指定してください。nats_queue_group
– NATSサブスクライバーのキューグループ名。デフォルトはテーブル名です。nats_max_reconnect
– 非推奨で、効果はありません。再接続はnats_reconnect_waitタイムアウトで永久に実行されます。nats_reconnect_wait
– 各再接続試行の間にスリープする時間(ミリ秒)。デフォルト:5000
。nats_server_list
– 接続用のサーバーリスト。NATSクラスタに接続するために指定できます。nats_skip_broken_messages
– ブロックごとのスキーマ不適合メッセージに対するNATSメッセージパーサーの耐性。デフォルト:0
。nats_skip_broken_messages = N
の場合、エンジンはパースできない N のNATSメッセージをスキップします(メッセージはデータ行に相当)。nats_max_block_size
– NATSからデータをフラッシュするために収集された行の数。デフォルト: max_insert_block_size。nats_flush_interval_ms
– NATSから読み取ったデータをフラッシュするためのタイムアウト。デフォルト: stream_flush_interval_ms。nats_username
– NATSのユーザー名。nats_password
– NATSのパスワード。nats_token
– NATSの認証トークン。nats_credential_file
– NATSの資格情報ファイルへのパス。nats_startup_connect_tries
– 起動時の接続試行回数。デフォルト:5
。nats_max_rows_per_message
— 行ベースのフォーマットの1つのNATSメッセージに書き込まれる最大行数。(デフォルト:1
)。nats_handle_error_mode
— NATSエンジンのエラー処理方法。可能な値: default(メッセージのパースに失敗した場合は例外がスローされます)、stream(例外メッセージと生メッセージが仮想カラム_error
および_raw_message
に保存されます)。
SSL接続:
セキュア接続を使用するには nats_secure = 1
を設定します。使用されるライブラリのデフォルトの動作は、作成されたTLS接続が十分にセキュアであるかどうかを確認しないことです。証明書が失効している、自己署名されている、欠落している、または無効である場合でも、接続は単に許可されます。将来的に、証明書のより厳格なチェックが実装される可能性があります。
NATSテーブルへの書き込み:
テーブルが1つのサブジェクトからのみ読み取る場合、任意の挿入は同じサブジェクトに発行されます。しかし、テーブルが複数のサブジェクトから読み取る場合、どのサブジェクトに発行するかを指定する必要があります。そのため、複数のサブジェクトを持つテーブルに挿入する際には、stream_like_engine_insert_queue
を設定する必要があります。テーブルが読み取るサブジェクトの1つを選択し、そこにデータを発行できます。例えば:
また、フォーマット設定はnats関連の設定とともに追加できます。
例:
NATSサーバーの構成は、ClickHouseの設定ファイルを使用して追加できます。具体的には、NATSエンジン用のRedisパスワードを追加できます:
説明
SELECT
はメッセージを読み取るのに特に便利ではありません(デバッグを除く)、なぜなら、各メッセージは一度しか読み取ることができないからです。実用的には、マテリアライズドビューを使用してリアルタイムスレッドを作成する方が良いです。そのためには:
- エンジンを使用してNATS消費者を作成し、それをデータストリームと見なします。
- 望ましい構造のテーブルを作成します。
- エンジンからのデータを変換し、事前に作成したテーブルに配置するマテリアライズドビューを作成します。
MATERIALIZED VIEW
がエンジンに接続されると、バックグラウンドでデータを収集し始めます。これにより、NATSからメッセージを継続的に受信し、SELECT
を使用して必要な形式に変換できます。一つのNATSテーブルには、必要なだけのマテリアライズドビューを持つことができ、これらはテーブルから直接データを読み取るのではなく、新しいレコードを(ブロック単位で)受け取ります。この方法により、異なる詳細レベル(グルーピング - 集約あり、なし)で異なるテーブルに書き込むことができます。
例:
ストリームデータの受信を停止するか、変換ロジックを変更するには、マテリアライズドビューを切り離します:
ALTER
を使用してターゲットテーブルを変更したい場合は、ターゲットテーブルとビューからのデータ間の不一致を避けるためにマテリアライズドビューを無効にすることをお勧めします。
仮想カラム
_subject
- NATSメッセージのサブジェクト。データ型:String
。
nats_handle_error_mode='stream'
の際の追加の仮想カラム:
_raw_message
- 正常にパースできなかった生メッセージ。データ型:Nullable(String)
。_error
- パースに失敗した際に発生した例外メッセージ。データ型:Nullable(String)
。
注意: _raw_message
および _error
の仮想カラムは、パース中に例外が発生した場合のみ埋められ、メッセージが正常にパースされた場合は常に NULL
です。
データフォーマットのサポート
NATSエンジンは、ClickHouseでサポートされるすべてのフォーマットをサポートしています。1つのNATSメッセージの行数は、フォーマットが行ベースかブロックベースかによって異なります:
- 行ベースのフォーマットの場合、1つのNATSメッセージの行数は
nats_max_rows_per_message
を設定することで制御できます。 - ブロックベースのフォーマットでは、ブロックを小さな部分に分割することはできませんが、1つのブロック内での行数は一般設定のmax_block_sizeによって制御できます。