Перейти к основному содержимому
Перейти к основному содержимому

Движок NATS

Этот движок позволяет интегрировать ClickHouse с NATS.

NATS позволяет вам:

  • Публиковать или подписываться на сообщения по темам.
  • Обрабатывать новые сообщения по мере их поступления.

Создание таблицы

Обязательные параметры:

  • nats_url – host:port (например, localhost:5672)..
  • nats_subjects – Список тем для таблицы NATS для подписки/публикации. Поддерживает шаблонные темы, такие как foo.*.bar или baz.>.
  • nats_format – Формат сообщения. Использует ту же нотацию, что и функция SQL FORMAT, например, JSONEachRow. Для получения дополнительной информации см. раздел Форматы.

Необязательные параметры:

  • nats_schema – Параметр, который должен быть использован, если формат требует определения схемы. Например, Cap'n Proto требует путь к файлу схемы и имя корневого объекта schema.capnp:Message.
  • nats_num_consumers – Количество потребителей на таблицу. По умолчанию: 1. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
  • nats_queue_group – Имя группы очереди подписчиков NATS. По умолчанию - имя таблицы.
  • nats_max_reconnect – Устарел и не имеет эффекта, повторное подключение выполняется постоянно с таймаутом nats_reconnect_wait.
  • nats_reconnect_wait – Время в миллисекундах, чтобы спать между каждой попыткой переподключения. По умолчанию: 5000.
  • nats_server_list - Список серверов для подключения. Может быть указан для подключения к кластеру NATS.
  • nats_skip_broken_messages - Тolerance парсера сообщений NATS к схематически несовместимым сообщениям на блок. По умолчанию: 0. Если nats_skip_broken_messages = N, тогда движок пропускает N сообщений NATS, которые не могут быть обработаны (сообщение равно строке данных).
  • nats_max_block_size - Количество строк, собранных опросами для сброса данных из NATS. По умолчанию: max_insert_block_size.
  • nats_flush_interval_ms - Таймаут для сброса данных, считанных из NATS. По умолчанию: stream_flush_interval_ms.
  • nats_username - Имя пользователя NATS.
  • nats_password - Пароль NATS.
  • nats_token - Токен аутентификации NATS.
  • nats_credential_file - Путь к файлу учетных данных NATS.
  • nats_startup_connect_tries - Количество попыток подключения при старте. По умолчанию: 5.
  • nats_max_rows_per_message — Максимальное количество строк, записываемых в одном NATS сообщении для форматов, основанных на строках. (по умолчанию : 1).
  • nats_handle_error_mode — Как обрабатывать ошибки для движка NATS. Возможные значения: default (исключение будет выброшено, если не удастся обработать сообщение), stream (сообщение исключения и сырое сообщение будут сохранены в виртуальных колонках _error и _raw_message).

SSL-соединение:

Для безопасного соединения используйте nats_secure = 1. Поведение используемой библиотеки по умолчанию заключается в том, чтобы не проверять, достаточно ли безопасно созданное TLS-соединение. Наличие истекшего, самоподписанного, отсутствующего или недействительного сертификата: соединение просто разрешается. Более строгая проверка сертификатов может быть реализована в будущем.

Запись в таблицу NATS:

Если таблица читает только из одной темы, любая вставка будет публиковаться в ту же тему. Однако, если таблица читает из нескольких тем, нам нужно указать, в какую тему мы хотим опубликовать. Поэтому при вставке в таблицу с несколькими темами необходимо установить stream_like_engine_insert_queue. Вы можете выбрать одну из тем, которые считываются таблицей, и опубликовать свои данные туда. Например:

Также настройки формата могут быть добавлены вместе с настройками, связанными с nats.

Пример:

Конфигурация сервера NATS может быть добавлена с использованием файла конфигурации ClickHouse. Более конкретно, вы можете добавить пароль Redis для движка NATS:

Описание

SELECT не особенно полезен для чтения сообщений (за исключением отладки), потому что каждое сообщение можно прочитать только один раз. Более практично создавать потоки в реальном времени, используя материализованные представления. Для этого:

  1. Используйте движок для создания потребителя NATS и рассматривайте его как поток данных.
  2. Создайте таблицу с нужной структурой.
  3. Создайте материализованное представление, которое преобразует данные из движка и помещает их в ранее созданную таблицу.

Когда MATERIALIZED VIEW присоединяется к движку, он начинает собирать данные в фоновом режиме. Это позволяет вам постоянно получать сообщения из NATS и преобразовывать их в требуемый формат с помощью SELECT. Одна таблица NATS может иметь столько материализованных представлений, сколько вам нужно; они не читают данные из таблицы напрямую, а получают новые записи (пачками), таким образом, вы можете записывать в несколько таблиц с разным уровнем детализации (с группировкой - агрегация и без).

Пример:

Чтобы остановить получение потоковых данных или изменить логику преобразования, отсоедините материализованное представление:

Если вы хотите изменить целевую таблицу, используя ALTER, мы рекомендуем отключить материализованное представление, чтобы избежать несоответствий между целевой таблицей и данными из представления.

Виртуальные колонки

  • _subject - Тема сообщения NATS. Тип данных: String.

Дополнительные виртуальные колонки, когда nats_handle_error_mode='stream':

  • _raw_message - Сырьё сообщение, которое не удалось обработать успешно. Тип данных: Nullable(String).
  • _error - Сообщение об исключении, произошедшем во время неудачной обработки. Тип данных: Nullable(String).

Примечание: _raw_message и _error виртуальные колонки заполняются только в случае исключения во время обработки, они всегда равны NULL, когда сообщение было успешно обработано.

Поддержка форматов данных

Движок NATS поддерживает все форматы, поддерживаемые в ClickHouse. Количество строк в одном сообщении NATS зависит от того, является ли формат основанным на строках или блоках:

  • Для форматов, основанных на строках, количество строк в одном сообщении NATS можно контролировать, устанавливая nats_max_rows_per_message.
  • Для форматов, основанных на блоках, мы не можем разделить блок на меньшие части, но количество строк в одном блоке можно контролировать с помощью общего параметра max_block_size.