メインコンテンツまでスキップ
メインコンテンツまでスキップ

NATSエンジン

このエンジンは、NATSとClickHouseを統合することを可能にします。

NATSを使用すると:

  • メッセージのサブジェクトに発行またはサブスクライブできます。
  • 新しいメッセージが利用可能になると処理できます。

テーブルの作成

必要なパラメーター:

  • nats_url – host:port (例: localhost:5672)..
  • nats_subjects – NATSテーブルがサブスクライブ/発行するサブジェクトのリスト。ワイルドカードサブジェクトのような foo.*.barbaz.> もサポート。
  • nats_format – メッセージ形式。SQLの FORMAT 関数と同様の表記法を使用します。例: JSONEachRow。詳細については、フォーマットセクションを参照してください。

オプションのパラメーター:

  • nats_schema – フォーマットがスキーマ定義を要求する場合に使用する必要があるパラメーター。例えば、Cap'n Protoはスキーマファイルのパスとルート schema.capnp:Message オブジェクトの名前を要求します。
  • nats_num_consumers – テーブルごとの消費者の数。デフォルト: 1。1つの消費者のスループットが不十分な場合は、より多くの消費者を指定してください。
  • nats_queue_group – NATSサブスクライバーのキューグループ名。デフォルトはテーブル名です。
  • nats_max_reconnect – 非推奨で、効果はありません。再接続はnats_reconnect_waitタイムアウトで永久に実行されます。
  • nats_reconnect_wait – 各再接続試行の間にスリープする時間(ミリ秒)。デフォルト: 5000
  • nats_server_list – 接続用のサーバーリスト。NATSクラスタに接続するために指定できます。
  • nats_skip_broken_messages – ブロックごとのスキーマ不適合メッセージに対するNATSメッセージパーサーの耐性。デフォルト: 0nats_skip_broken_messages = N の場合、エンジンはパースできない N のNATSメッセージをスキップします(メッセージはデータ行に相当)。
  • nats_max_block_size – NATSからデータをフラッシュするために収集された行の数。デフォルト: max_insert_block_size
  • nats_flush_interval_ms – NATSから読み取ったデータをフラッシュするためのタイムアウト。デフォルト: stream_flush_interval_ms
  • nats_username – NATSのユーザー名。
  • nats_password – NATSのパスワード。
  • nats_token – NATSの認証トークン。
  • nats_credential_file – NATSの資格情報ファイルへのパス。
  • nats_startup_connect_tries – 起動時の接続試行回数。デフォルト: 5
  • nats_max_rows_per_message — 行ベースのフォーマットの1つのNATSメッセージに書き込まれる最大行数。(デフォルト: 1)。
  • nats_handle_error_mode — NATSエンジンのエラー処理方法。可能な値: default(メッセージのパースに失敗した場合は例外がスローされます)、stream(例外メッセージと生メッセージが仮想カラム _error および _raw_message に保存されます)。

SSL接続:

セキュア接続を使用するには nats_secure = 1 を設定します。使用されるライブラリのデフォルトの動作は、作成されたTLS接続が十分にセキュアであるかどうかを確認しないことです。証明書が失効している、自己署名されている、欠落している、または無効である場合でも、接続は単に許可されます。将来的に、証明書のより厳格なチェックが実装される可能性があります。

NATSテーブルへの書き込み:

テーブルが1つのサブジェクトからのみ読み取る場合、任意の挿入は同じサブジェクトに発行されます。しかし、テーブルが複数のサブジェクトから読み取る場合、どのサブジェクトに発行するかを指定する必要があります。そのため、複数のサブジェクトを持つテーブルに挿入する際には、stream_like_engine_insert_queue を設定する必要があります。テーブルが読み取るサブジェクトの1つを選択し、そこにデータを発行できます。例えば:

また、フォーマット設定はnats関連の設定とともに追加できます。

例:

NATSサーバーの構成は、ClickHouseの設定ファイルを使用して追加できます。具体的には、NATSエンジン用のRedisパスワードを追加できます:

説明

SELECTはメッセージを読み取るのに特に便利ではありません(デバッグを除く)、なぜなら、各メッセージは一度しか読み取ることができないからです。実用的には、マテリアライズドビューを使用してリアルタイムスレッドを作成する方が良いです。そのためには:

  1. エンジンを使用してNATS消費者を作成し、それをデータストリームと見なします。
  2. 望ましい構造のテーブルを作成します。
  3. エンジンからのデータを変換し、事前に作成したテーブルに配置するマテリアライズドビューを作成します。

MATERIALIZED VIEWがエンジンに接続されると、バックグラウンドでデータを収集し始めます。これにより、NATSからメッセージを継続的に受信し、SELECTを使用して必要な形式に変換できます。一つのNATSテーブルには、必要なだけのマテリアライズドビューを持つことができ、これらはテーブルから直接データを読み取るのではなく、新しいレコードを(ブロック単位で)受け取ります。この方法により、異なる詳細レベル(グルーピング - 集約あり、なし)で異なるテーブルに書き込むことができます。

例:

ストリームデータの受信を停止するか、変換ロジックを変更するには、マテリアライズドビューを切り離します:

ALTERを使用してターゲットテーブルを変更したい場合は、ターゲットテーブルとビューからのデータ間の不一致を避けるためにマテリアライズドビューを無効にすることをお勧めします。

仮想カラム

  • _subject - NATSメッセージのサブジェクト。データ型: String

nats_handle_error_mode='stream'の際の追加の仮想カラム:

  • _raw_message - 正常にパースできなかった生メッセージ。データ型: Nullable(String)
  • _error - パースに失敗した際に発生した例外メッセージ。データ型: Nullable(String)

注意: _raw_message および _error の仮想カラムは、パース中に例外が発生した場合のみ埋められ、メッセージが正常にパースされた場合は常に NULL です。

データフォーマットのサポート

NATSエンジンは、ClickHouseでサポートされるすべてのフォーマットをサポートしています。1つのNATSメッセージの行数は、フォーマットが行ベースかブロックベースかによって異なります:

  • 行ベースのフォーマットの場合、1つのNATSメッセージの行数は nats_max_rows_per_message を設定することで制御できます。
  • ブロックベースのフォーマットでは、ブロックを小さな部分に分割することはできませんが、1つのブロック内での行数は一般設定のmax_block_sizeによって制御できます。