Перейти к основному содержимому
Перейти к основному содержимому

Kafka

Not supported in ClickHouse Cloud
примечание

Пользователям ClickHouse Cloud рекомендуется использовать ClickPipes для потоковой передачи данных Kafka в ClickHouse. Это изначально поддерживает высокопроизводительную вставку, обеспечивая при этом разделение ответственности с возможностью независимого масштабирования приема данных и ресурсов кластера.

Этот движок работает с Apache Kafka.

Kafka позволяет вам:

  • Публиковать или подписываться на потоки данных.
  • Организовывать отказоустойчивое хранилище.
  • Обрабатывать потоки по мере их появления.

Создание таблицы

Обязательные параметры:

  • kafka_broker_list — Список брокеров, разделенных запятыми (например, localhost:9092).
  • kafka_topic_list — Список тем Kafka.
  • kafka_group_name — Группа потребителей Kafka. Пограничные чтения отслеживаются для каждой группы отдельно. Если вы не хотите, чтобы сообщения дублировались в кластере, используйте одно и то же имя группы повсюду.
  • kafka_format — Формат сообщений. Использует такую же нотацию, как функция SQL FORMAT, например JSONEachRow. Для получения дополнительной информации смотрите раздел Formats.

Дополнительные параметры:

  • kafka_security_protocol - Протокол, используемый для общения с брокерами. Возможные значения: plaintext, ssl, sasl_plaintext, sasl_ssl.
  • kafka_sasl_mechanism - SASL механизм, используемый для аутентификации. Возможные значения: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER.
  • kafka_sasl_username - SASL имя пользователя для использования с механизмами PLAIN и SASL-SCRAM-...
  • kafka_sasl_password - SASL пароль для использования с механизмами PLAIN и SASL-SCRAM-...
  • kafka_schema — Параметр, который необходимо использовать, если формат требует определения схемы. Например, Cap'n Proto требует путь к файлу схемы и имя корневого объекта schema.capnp:Message.
  • kafka_num_consumers — Количество потребителей на таблицу. Укажите большее количество потребителей, если пропускная способность одного потребителя недостаточна. Общее количество потребителей не должно превышать количество партиций в теме, поскольку на каждую партицию может быть назначен только один потребитель, и не должно превышать количество физических ядер на сервере, где развернут ClickHouse. По умолчанию: 1.
  • kafka_max_block_size — Максимальный размер партии (в сообщениях) для опроса. По умолчанию: max_insert_block_size.
  • kafka_skip_broken_messages — Толерантность парсера сообщений Kafka к схемам, несовместимым с сообщениями за блок. Если kafka_skip_broken_messages = N, то движок пропустит N сообщений Kafka, которые не могут быть распознаны (сообщение равно строке данных). По умолчанию: 0.
  • kafka_commit_every_batch — Подтверждать каждую потребляемую и обработанную партию вместо одного подтверждения после записи целого блока. По умолчанию: 0.
  • kafka_client_id — Идентификатор клиента. По умолчанию пустой.
  • kafka_poll_timeout_ms — Тайм-аут для одного опроса от Kafka. По умолчанию: stream_poll_timeout_ms.
  • kafka_poll_max_batch_size — Максимальное количество сообщений, которые будут опрошены в одном опросе Kafka. По умолчанию: max_block_size.
  • kafka_flush_interval_ms — Тайм-аут для сброса данных из Kafka. По умолчанию: stream_flush_interval_ms.
  • kafka_thread_per_consumer — Предоставить независимый поток для каждого потребителя. Когда это включено, каждый потребитель сбрасывает данные независимо, параллельно (в противном случае — строки от нескольких потребителей будут объединены, чтобы образовать один блок). По умолчанию: 0.
  • kafka_handle_error_mode — Как обрабатывать ошибки для движка Kafka. Возможные значения: default (исключение будет выброшено, если парсинг сообщения не удался), stream (исключение сообщения и необработанное сообщение будут сохранены в виртуальных колонках _error и _raw_message).
  • kafka_commit_on_select — Подтверждать сообщения, когда выполняется запрос. По умолчанию: false.
  • kafka_max_rows_per_message — Максимальное количество строк, записанных в одном сообщении kafka для форматов на основе строк. По умолчанию: 1.

Примеры:

Устаревший метод создания таблицы
примечание

Не используйте этот метод в новых проектах. Если возможно, переключите старые проекты на метод, описанный выше.

к сведению

Движок таблицы Kafka не поддерживает колонки с значением по умолчанию. Если вам нужны колонки со значением по умолчанию, вы можете добавить их на уровне материализованного представления (см. ниже).

Описание

Доставляемые сообщения отслеживаются автоматически, поэтому каждое сообщение в группе учитывается только один раз. Если вы хотите получить данные дважды, создайте копию таблицы с другим именем группы.

Группы являются гибкими и синхронизируются в кластере. Например, если у вас есть 10 тем и 5 копий таблицы в кластере, тогда каждая копия получает 2 темы. Если количество копий изменяется, темы автоматически перераспределяются между копиями. Читайте больше об этом на http://kafka.apache.org/intro.

SELECT не очень полезен для чтения сообщений (за исключением отладки), потому что каждое сообщение можно прочитать только один раз. Практичнее создавать потоки в реальном времени с помощью материализованных представлений. Для этого:

  1. Используйте движок для создания потребителя Kafka и рассматривайте его как поток данных.
  2. Создайте таблицу с желаемой структурой.
  3. Создайте материализованное представление, которое конвертирует данные из движка и помещает их в ранее созданную таблицу.

Когда MATERIALIZED VIEW соединяется с движком, он начинает собирать данные в фоновом режиме. Это позволяет вам постоянно получать сообщения из Kafka и конвертировать их в требуемый формат с помощью SELECT. Одна таблица kafka может иметь столько материализованных представлений, сколько вам нужно, они не читают данные напрямую из таблицы kafka, а получают новые записи (пакетами), таким образом, вы можете записывать в несколько таблиц с различным уровнем деталей (с группировкой - агрегацией и без).

Пример:

Для повышения производительности полученные сообщения группируются в блоки размером max_insert_block_size. Если блок не был сформирован в течение stream_flush_interval_ms миллисекунд, данные будут сброшены в таблицу независимо от полноты блока.

Чтобы остановить получение данных темы или изменить логику конвертации, отсоедините материализованное представление:

Если вы хотите изменить целевую таблицу, используя ALTER, мы рекомендуем отключить материализованное представление, чтобы избежать несоответствий между целевой таблицей и данными из представления.

Настройка

Подобно GraphiteMergeTree, движок Kafka поддерживает расширенную настройку с использованием файла конфигурации ClickHouse. Существует два ключа конфигурации, которые вы можете использовать: глобальный (ниже <kafka>) и на уровне темы (ниже <kafka><kafka_topic>). Глобальная конфигурация применяется первой, а затем применяются настройки на уровне темы (если они существуют).

Для получения списка возможных параметров конфигурации смотрите librdkafka configuration reference. Используйте нижнее подчеркивание (_) вместо точки в конфигурации ClickHouse. Например, check.crcs=true будет <check_crcs>true</check_crcs>.

Поддержка Kerberos

Чтобы работать с Kafka, поддерживающей Kerberos, добавьте дочерний элемент security_protocol со значением sasl_plaintext. Достаточно, чтобы билет на предоставление билетов Kerberos был получен и кэширован средствами ОС. ClickHouse способен поддерживать учетные данные Kerberos с использованием файла keytab. Рассмотрите элементы sasl_kerberos_service_name, sasl_kerberos_keytab и sasl_kerberos_principal.

Пример:

Виртуальные колонки

  • _topic — Тема Kafka. Тип данных: LowCardinality(String).
  • _key — Ключ сообщения. Тип данных: String.
  • _offset — Смещение сообщения. Тип данных: UInt64.
  • _timestamp — Метка времени сообщения. Тип данных: Nullable(DateTime).
  • _timestamp_ms — Метка времени в миллисекундах сообщения. Тип данных: Nullable(DateTime64(3)).
  • _partition — Партиция темы Kafka. Тип данных: UInt64.
  • _headers.name — Массив ключей заголовков сообщения. Тип данных: Array(String).
  • _headers.value — Массив значений заголовков сообщения. Тип данных: Array(String).

Дополнительные виртуальные колонки при kafka_handle_error_mode='stream':

  • _raw_message - Необработанное сообщение, которое не удалось успешно распознать. Тип данных: String.
  • _error - Сообщение исключения, возникшего во время неудачного парсинга. Тип данных: String.

Примечание: виртуальные колонки _raw_message и _error заполняются только в случае исключения во время парсинга, они всегда пусты, когда сообщение было успешно разобрано.

Поддержка форматов данных

Движок Kafka поддерживает все форматы, поддерживаемые в ClickHouse. Количество строк в одном сообщении Kafka зависит от того, является ли формат основанным на строках или блоках:

  • Для форматов, основанных на строках, количество строк в одном сообщении Kafka может контролироваться с помощью настройки kafka_max_rows_per_message.
  • Для форматов на основе блоков мы не можем делить блок на более мелкие части, но количество строк в одном блоке можно контролировать с помощью общей настройки max_block_size.

Движок для хранения подтвержденных смещений в ClickHouse Keeper

Experimental feature. Learn more.

Если allow_experimental_kafka_offsets_storage_in_keeper включен, тогда можно указать еще два параметра для движка таблицы Kafka:

  • kafka_keeper_path указывает путь к таблице в ClickHouse Keeper
  • kafka_replica_name указывает имя реплики в ClickHouse Keeper

Либо оба параметра должны быть указаны, либо ни один из них. Когда оба указаны, будет использован новый экспериментальный движок Kafka. Новый движок не зависит от хранения подтвержденных смещений в Kafka, а хранит их в ClickHouse Keeper. Он все еще пытается подтвердить смещения в Kafka, но он зависит от этих смещений только при создании таблицы. В любых других обстоятельствах (таблица перезапускается или восстанавливается после ошибки) будут использоваться смещения, хранящиеся в ClickHouse Keeper, как смещение для продолжения потребления сообщений. Помимо подтвержденного смещения, он также хранит, сколько сообщений было потреблено в последней партии, так что если вставка не удалась, будет потреблено такое же количество сообщений, что позволяет избежать дублирования, если это необходимо.

Пример:

Или для использования макросов uuid и replica, аналогично ReplicatedMergeTree:

Известные ограничения

Поскольку новый движок является экспериментальным, он еще не готов к производству. Существует несколько известных ограничений реализации:

  • Главным ограничением является то, что движок не поддерживает прямое чтение. Чтение из движка с использованием материализованных представлений и запись в движок работает, но прямое чтение не работает. В результате все прямые запросы SELECT будут завершаться неудачей.
  • Быстрое удаление и пере создание таблицы или указание одного и того же пути ClickHouse Keeper для разных движков может вызвать проблемы. Как лучшая практика, вы можете использовать {uuid} в kafka_keeper_path, чтобы избежать конфликтующих путей.
  • Для выполнения повторяемых чтений сообщения не могут потребляться из нескольких партиций в одном потоке. С другой стороны, потребители Kafka должны периодически опрашиваться, чтобы оставаться активными. В результате этих двух целей мы решили позволить создание нескольких потребителей только при включении kafka_thread_per_consumer, иначе это слишком сложно, чтобы избежать проблем с частым опросом потребителей.
  • Потребители, созданные новым движком хранения, не отображаются в таблице system.kafka_consumers.

Смотрите также