Перейти к основному содержимому
Перейти к основному содержимому

Использование движка таблиц Kafka

Not supported in ClickHouse Cloud
примечание

Движок таблиц Kafka не поддерживается на ClickHouse Cloud. Пожалуйста, рассмотрите вариант использования ClickPipes или Kafka Connect.

Kafka в ClickHouse

Чтобы использовать движок таблиц Kafka, вам необходимо быть в основном знакомым с материализованными представлениями ClickHouse.

Обзор

Сначала мы сосредоточимся на самом распространенном случае: использовании движка таблиц Kafka для вставки данных в ClickHouse из Kafka.

Движок таблиц Kafka позволяет ClickHouse читать напрямую из топика Kafka. Хотя это полезно для просмотра сообщений в топике, движок по умолчанию разрешает одноразовое извлечение, т.е. когда запрос отправляется к таблице, он потребляет данные из очереди и увеличивает смещение потребителя, прежде чем вернуть результаты вызывающей стороне. Эффективно данные не могут быть прочитаны повторно без сброса этих смещений.

Чтобы сохранить эти данные из чтения движка таблиц, нам необходимо средство для захвата данных и вставки их в другую таблицу. Уровневые материализированные представления по сути предоставляют эту функциональность. Материализованное представление инициирует чтение из движка таблиц, получая партии документов. Клауза TO определяет место назначения данных - обычно это таблица из семейства Merge Tree. Этот процесс визуализирован ниже:

Шаги

1. Подготовка

Если у вас есть данные на целевом топике, вы можете адаптировать следующее для использования в вашем наборе данных. Кроме того, пример набора данных из Github предоставлен здесь. Этот набор данных используется в примерах ниже и использует уменьшенную схему и подмножество строк (в частности, мы ограничиваемся событиями GitHub, касающимися репозитория ClickHouse), по сравнению с полным набором данных, доступным здесь, ради краткости. Этого все еще достаточно для большинства запросов опубликованных с набором данных для работы.

2. Настройка ClickHouse

Этот шаг необходим, если вы подключаетесь к защищенному Kafka. Эти настройки нельзя передать через SQL DDL команды и должны быть настроены в файле config.xml ClickHouse. Мы предполагаем, что вы подключаетесь к экземпляру с защитой SASL. Это самый простой метод взаимодействия с Confluent Cloud.

Либо поместите приведенный выше фрагмент в новый файл в вашей директории conf.d/, либо объедините его с существующими файлами конфигурации. Для настроек, которые можно настроить, смотрите здесь.

Мы также создадим базу данных под названием KafkaEngine, чтобы использовать ее в этом руководстве:

После того как вы создали базу данных, вам нужно переключиться на нее:

3. Создание целевой таблицы

Подготовьте свою целевую таблицу. В примере ниже мы используем уменьшенную схему GitHub для краткости. Обратите внимание, что хотя мы используем движок таблиц MergeTree, этот пример можно легко адаптировать для любого члена семейства MergeTree.

4. Создание и заполнение топика

Далее мы собираемся создать топик. Есть несколько инструментов, которые мы можем использовать для этого. Если мы запускаем Kafka локально на нашем компьютере или внутри контейнера Docker, RPK отлично подходит. Мы можем создать топик с именем github с 5 партициями, выполнив следующую команду:

Если мы запускаем Kafka в Confluent Cloud, мы могли бы предпочесть использовать Confluent CLI:

Теперь нам нужно заполнить этот топик данными, что мы сделаем с помощью kcat. Мы можем выполнить команду, аналогичную следующей, если мы запускаем Kafka локально с отключенной аутентификацией:

Или следующую, если наш кластер Kafka использует SASL для аутентификации:

Набор данных содержит 200,000 строк, поэтому его должно быть загружено всего за несколько секунд. Если вы хотите работать с большим набором данных, ознакомьтесь с разделом больших наборов данных из репозитория ClickHouse/kafka-samples.

5. Создание движка таблиц Kafka

Пример ниже создает движок таблиц с той же схемой, что и таблица MergeTree. Это строго не обязательно, поскольку вы можете использовать псевдонимы или временные столбцы в целевой таблице. Однако настройки важны; обратите внимание на использование JSONEachRow в качестве типа данных для потребления JSON из топика Kafka. Значения github и clickhouse представляют собой имена топика и группы потребителей соответственно. Топики могут на самом деле быть списком значений.

Мы обсуждаем настройки движка и оптимизацию производительности ниже. На этом этапе простой запрос к таблице github_queue должен прочитать несколько строк. Обратите внимание, что это переместит смещения потребителей вперед, предотвращая повторное чтение этих строк без сброса. Обратите внимание на лимит и необходимый параметр stream_like_engine_allow_direct_select.

6. Создание материализованного представления

Материализованное представление свяжет две ранее созданные таблицы, считая данные из движка таблиц Kafka и вставляя их в целевую таблицу Merge Tree. Мы можем выполнять ряд преобразований данных. Мы выполним простое чтение и вставку. Использование * предполагает, что имена столбцов идентичны (чувствительно к регистру).

На момент создания материализованное представление подключается к движку Kafka и начинает чтение: вставляя строки в целевую таблицу. Этот процесс будет продолжаться бесконечно, с последующими вставками сообщений в Kafka, которые будут потребляться. Не стесняйтесь повторно запускать скрипт вставки, чтобы вставить дополнительные сообщения в Kafka.

7. Подтверждение вставки строк

Подтвердите наличие данных в целевой таблице:

Вы должны увидеть 200,000 строк:

Общие операции

Остановка и перезапуск потребления сообщений

Чтобы остановить потребление сообщений, вы можете отсоединить таблицу движка Kafka:

Это не повлияет на смещения группы потребителей. Чтобы перезапустить потребление и продолжить с предыдущего смещения, повторно подключите таблицу.

Добавление метаданных Kafka

Может быть полезно отслеживать метаданные из оригинальных сообщений Kafka после их загрузки в ClickHouse. Например, мы можем желать знать, сколько конкретного топика или партиции мы потребили. Для этой цели движок таблиц Kafka предоставляет несколько виртуальных столбцов. Эти столбцы могут быть сохранены как столбцы в нашей целевой таблице путем изменения нашей схемы и оператора select нашего материализованного представления.

Сначала мы выполняем операцию остановки, описанную выше, прежде чем добавлять столбцы в нашу целевую таблицу.

Ниже мы добавляем информационные столбцы для идентификации исходного топика и партиции, из которой произошла строка.

Далее, нам нужно убедиться, что виртуальные столбцы сопоставлены по мере необходимости. Виртуальные столбцы начинаются с символа _. Полный список виртуальных столбцов можно найти здесь.

Чтобы обновить нашу таблицу с виртуальными столбцами, нам нужно удалить материализованное представление, повторно подключить таблицу движка Kafka и заново создать материализованное представление.

Новые потребляемые строки должны содержать метаданные.

Результат выглядит следующим образом:

actor_loginevent_typecreated_attopicpartition
IgorMinarCommitCommentEvent2011-02-12 02:22:00github0
queeupCommitCommentEvent2011-02-12 02:23:23github0
IgorMinarCommitCommentEvent2011-02-12 02:23:24github0
IgorMinarCommitCommentEvent2011-02-12 02:24:50github0
IgorMinarCommitCommentEvent2011-02-12 02:25:20github0
dapiCommitCommentEvent2011-02-12 06:18:36github0
sourcerebelsCommitCommentEvent2011-02-12 06:34:10github0
jamierumbelowCommitCommentEvent2011-02-12 12:21:40github0
jpnCommitCommentEvent2011-02-12 12:24:31github0
OxoniumCommitCommentEvent2011-02-12 12:31:28github0
Изменение настроек движка Kafka

Мы рекомендуем удалить таблицу движка Kafka и заново создать ее с новыми настройками. В этом процессе материализованное представление изменять не нужно - потребление сообщений возобновится, как только таблица движка Kafka будет заново создана.

Устранение неисправностей

Ошибки, такие как проблемы с аутентификацией, не отображаются в ответах на DDL движка Kafka. Для диагностики проблем мы рекомендуем использовать основной файл журнала ClickHouse clickhouse-server.err.log. Дополнительное журналирование трассировки для используемой библиотеки клиентского программного обеспечения Kafka librdkafka может быть включено через конфигурацию.

Обработка неверно оформленных сообщений

Kafka часто используется как "свалка" для данных. Это приводит к тому, что топики содержат смешанные форматы сообщений и непоследовательные имена полей. Избегайте этого и используйте функции Kafka, такие как Kafka Streams или ksqlDB, чтобы гарантировать, что сообщения правильно оформлены и последовательны перед вставкой в Kafka. Если эти варианты невозможны, у ClickHouse есть некоторые функции, которые могут помочь.

  • Рассматривайте поле сообщения как строки. Если необходимо, в операторе материализованного представления могут быть использованы функции для фильтрации и приведения типов. Это не должно представлять решение для производства, но может помочь в одноразовой загрузке.
  • Если вы потребляете JSON из топика, используя формат JSONEachRow, используйте настройку input_format_skip_unknown_fields. По умолчанию ClickHouse генерирует исключение, если входные данные содержат столбцы, которые не существуют в целевой таблице. Однако если этот параметр включен, эти лишние столбцы будут проигнорированы. Снова, это не решение для производства и может запутать других.
  • Рассмотрите настройку kafka_skip_broken_messages. Это требует от пользователя указать уровень терпимости за блок для неверно оформленных сообщений - который рассматривается в контексте kafka_max_block_size. Если эта терпимость превышена (измеряется в абсолютных сообщениях), обычное поведение исключений вернется, и другие сообщения будут пропущены.
Семантика доставки и проблемы с дубликатами

Движок таблиц Kafka имеет семантику как минимум один раз. Дубликаты возможны в нескольких известных редких обстоятельствах. Например, сообщения могут быть прочитаны из Kafka и успешно вставлены в ClickHouse. Прежде чем новое смещение может быть зафиксировано, связь с Kafka теряется. В этой ситуации требуется повторная попытка блока. Блок можно дедуплицировать с помощью распределенной таблицы или ReplicatedMergeTree в качестве целевой таблицы. Хотя это уменьшает вероятность дублированных строк, это зависит от идентичных блоков. События, такие как перераспределение Kafka, могут аннулировать это предположение, вызывая дубликаты в редких случаях.

Вставки на основе кворума

Вам могут понадобиться вставки на основе кворума в случаях, когда в ClickHouse требуются более высокие гарантии доставки. Это нельзя установить для материализованного представления или целевой таблицы. Однако это может быть установлено для пользовательских профилей, например:

ClickHouse в Kafka

Хотя это более редкий случай, данные ClickHouse также могут сохраняться в Kafka. Например, мы вставим строки вручную в движок таблиц Kafka. Эти данные будут прочитаны тем же движком Kafka, который поместит данные в таблицу Merge Tree. Наконец, мы продемонстрируем применение материализованных представлений при вставке в Kafka, чтобы прочитать таблицы из существующих исходных таблиц.

Шаги

Наш начальный объект наиболее хорошо иллюстрируется:

Мы предполагаем, что у вас есть созданные таблицы и представления в шагах для Kafka в ClickHouse и что топик был полностью потреблен.

1. Вставка строк напрямую

Сначала подтвердите количество в целевой таблице.

У вас должно быть 200,000 строк:

Теперь вставьте строки из целевой таблицы GitHub обратно в движок таблиц Kafka github_queue. Обратите внимание, как мы используем формат JSONEachRow и ограничиваем выборку 100 строками.

Пересчитайте строки в GitHub, чтобы подтвердить, что их количество увеличилось на 100. Как показано на вышеупомянутой диаграмме, строки были вставлены в Kafka через движок таблиц Kafka, прежде чем быть повторно прочитанными тем же движком и вставленными в целевую таблицу GitHub нашим материализованным представлением!

Вы должны увидеть 100 дополнительных строк:

2. Использование материализованных представлений

Мы можем использовать материализованные представления для отправки сообщений в движок Kafka (и топик) при вставке документов в таблицу. Когда строки вставляются в таблицу GitHub, срабатывает материализованное представление, которое заставляет строки вставляться обратно в движок Kafka и в новый топик. Опять же, это лучше всего иллюстрируется:

Создайте новый топик Kafka github_out или эквивалентный. Убедитесь, что движок таблиц Kafka github_out_queue ссылается на этот топик.

Теперь создайте новое материализованное представление github_out_mv, чтобы указывать на таблицу GitHub, вставляя строки в вышеуказанный движок, когда оно сработает. Изменения в таблице GitHub, в результате, будут отправлены в наш новый топик Kafka.

Если вы вставите в оригинальный топик github, созданный в рамках Kafka в ClickHouse, документы волшебным образом появятся в топике "github_clickhouse". Подтвердите это с помощью нативных инструментов Kafka. Например, ниже мы вставляем 100 строк в топик github, используя kcat для топика, размещенного в Confluent Cloud:

Чтение в топике github_out должно подтвердить доставку сообщений.

Хотя это сложный пример, он иллюстрирует мощь материализованных представлений, когда они используются совместно с движком Kafka.

Кластеры и производительность

Работа с кластерами ClickHouse

Через группы потребителей Kafka несколько экземпляров ClickHouse могут потенциально читать из одного и того же топика. Каждый потребитель будет назначен на партицию топика в соотношении 1:1. При масштабировании потребления ClickHouse с использованием движка таблиц Kafka учитывайте, что общее количество потребителей в кластере не может превышать количество партиций в топике. Поэтому убедитесь, что партиционирование должным образом сконфигурировано для топика заранее.

Несколько экземпляров ClickHouse могут быть настроены на чтение из одного топика, используя один и тот же идентификатор группы потребителей - этот идентификатор указывается при создании движка таблиц Kafka. Таким образом, каждый экземпляр будет читать из одной или нескольких партиций, вставляя сегменты в свои локальные целевые таблицы. Целевые таблицы, в свою очередь, могут быть настроены на использование ReplicatedMergeTree для обработки дублирования данных. Этот подход позволяет масштабировать чтение из Kafka вместе с кластером ClickHouse, при условии, что имеется достаточное количество партиций Kafka.

Настройка производительности

Учитывайте следующее, когда хотите увеличить производительность таблицы движка Kafka:

  • Производительность будет варьироваться в зависимости от размера сообщения, формата и типов целевых таблиц. 100k строк в секунду на одном движке таблиц следует считать достижимым. По умолчанию сообщения читаются блоками, контролируемыми параметром kafka_max_block_size. По умолчанию это установлено в max_insert_block_size, по умолчанию равном 1,048,576. Если сообщения не слишком большие, это значение следует почти всегда увеличивать. Значения от 500k до 1M не являются редкостью. Протестируйте и оцените влияние на производительность.
  • Количество потребителей для движка таблиц можно увеличить, используя kafka_num_consumers. Однако по умолчанию вставки будут линейными в одном потоке, если kafka_thread_per_consumer не изменен из значения по умолчанию 1. Установите это значение в 1, чтобы гарантировать выполнение сбросов параллельно. Обратите внимание, что создание таблицы движка Kafka с N потребителями (и kafka_thread_per_consumer=1) логически эквивалентно созданию N движков Kafka, каждый с материализованным представлением и kafka_thread_per_consumer=0.
  • Увеличение числа потребителей не является бесплатной операцией. Каждый потребитель поддерживает свои собственные буферы и потоки, увеличивая нагрузку на сервер. Обратите внимание на накладные расходы потребителей и сначала масштабируйте линейно по вашему кластеру, если это возможно.
  • Если пропускная способность сообщений Kafka переменная, а задержки допустимы, подумайте о том, чтобы увеличить stream_flush_interval_ms, чтобы гарантировать, что более крупные блоки сбрасываются.
  • background_message_broker_schedule_pool_size устанавливает число потоков, выполняющих фоновую работу. Эти потоки используются для потоковой передачи Kafka. Эта настройка применяется при запуске сервера ClickHouse и не может быть изменена в сеансе пользователя, по умолчанию равная 16. Если вы видите тайм-ауты в журналах, возможно, имеет смысл увеличить это значение.
  • Для коммуникации с Kafka используется библиотека librdkafka, которая сама создает потоки. Большое количество таблиц Kafka или потребителей может привести к большому количеству переключений контекста. Либо распределите эту нагрузку по кластеру, повторно создавая целевые таблицы, если это возможно, либо рассмотрите возможность использования движка таблиц для чтения из нескольких топиков - поддерживается список значений. Можно читать несколько материализованных представлений с одной таблицы, каждое из которых фильтрует данные из конкретного топика.

Любые изменения в настройках должны быть протестированы. Мы рекомендуем мониторить задержки потребителей Kafka, чтобы убедиться, что у вас правильный масштаб.

Дополнительные настройки

Кроме вышеупомянутых настроек, следующее может представлять интерес:

  • Kafka_max_wait_ms - время ожидания в миллисекундах для чтения сообщений из Kafka перед повторной попыткой. Устанавливается на уровне пользовательского профиля и по умолчанию равен 5000.

Все настройки из библиотеки librdkafka также могут быть помещены в файлы конфигурации ClickHouse внутри элемента kafka - имена настроек должны быть XML-элементами с точками, замененными на подчеркивания, например:

Это экспертные настройки, и мы рекомендуем вам обратиться к документации Kafka для более подробного объяснения.