Перейти к основному содержимому
Перейти к основному содержимому

Использование движка таблицы Kafka

Not supported in ClickHouse Cloud
примечание

Движок таблицы Kafka не поддерживается на ClickHouse Cloud. Пожалуйста, рассмотрите ClickPipes или Kafka Connect

Kafka в ClickHouse

Чтобы использовать движок таблицы Kafka, вам следует иметь общее представление о материализованных представлениях ClickHouse.

Обзор

Изначально мы сосредотачиваемся на самом распространенном случае: использованию движка таблицы Kafka для вставки данных в ClickHouse из Kafka.

Движок таблицы Kafka позволяет ClickHouse читать непосредственно из темы Kafka. Хотя это полезно для просмотра сообщений в теме, движок по своей природе разрешает одноразовое извлечение, т.е. когда запрос выдается к таблице, он потребляет данные из очереди и увеличивает смещение потребителя перед возвратом результатов вызывающему. Данные не могут быть повторно прочитаны без сброса этих смещений.

Чтобы сохранить данные из чтения движка таблицы, нам нужно средство захвата данных и вставки их в другую таблицу. Материализованные представления, работающие на основе триггеров, нативно предоставляют эту функциональность. Материализованное представление инициирует чтение из движка таблицы, получая партии документов. Условие TO определяет пункт назначения данных - обычно это таблица из семейства Merge Tree. Этот процесс визуализирован ниже:

Движок таблицы Kafka

Шаги

1. Подготовка

Если у вас есть данные, помещенные в целевую тему, вы можете адаптировать следующее для использования в вашем наборе данных. В качестве альтернативы, образец набора данных Github представлен здесь. Этот набор данных используется в примерах ниже и имеет уменьшенную схему и подмножество строк (в частности, мы ограничиваемся событиями GitHub, касающимися репозитория ClickHouse), по сравнению с полным набором данных, доступным здесь, для краткости. Это по-прежнему достаточно для выполнения большинства запросов, опубликованных с набором данных.

2. Настройка ClickHouse

Этот шаг необходим, если вы подключаетесь к защищенному Kafka. Эти настройки не могут быть переданы через SQL DDL команды и должны быть настроены в config.xml ClickHouse. Мы предполагаем, что вы подключаетесь к экземпляру, защищенному SASL. Это самый простой способ взаимодействия с Confluent Cloud.

Поместите приведенный выше фрагмент в новый файл в директории conf.d/ или объедините его с существующими конфигурационными файлами. Для настроек, которые можно конфигурировать, смотрите здесь.

Мы также создадим базу данных с именем KafkaEngine, которую будем использовать в этом учебнике:

После создания базы данных вам нужно будет переключиться на нее:

3. Создание целевой таблицы

Подготовьте вашу целевую таблицу. В приведенном ниже примере мы используем уменьшенную схему GitHub для краткости. Обратите внимание, что хотя мы используем движок таблицы MergeTree, этот пример можно легко адаптировать для любого из членов семейства MergeTree.

4. Создание и заполнение темы

Следующим шагом мы создадим тему. Существует несколько инструментов, которые мы можем использовать для этого. Если мы запускаем Kafka локально на своем компьютере или внутри контейнера Docker, RPK отлично подойдет. Мы можем создать тему под названием github с 5 партициями, выполнив следующую команду:

Если мы запускаем Kafka в Confluent Cloud, нам может быть удобнее использовать Confluent CLI:

Теперь нам нужно заполнить эту тему некоторыми данными, для чего мы будем использовать kcat. Мы можем выполнить команду, похожую на следующую, если запускаем Kafka локально с отключенной аутентификацией:

Или следующую, если наш кластер Kafka использует SASL для аутентификации:

Набор данных содержит 200,000 строк, поэтому он должен быть загружен всего за несколько секунд. Если вы хотите работать с большим набором данных, ознакомьтесь с разделом больших наборов данных репозитория ClickHouse/kafka-samples.

5. Создание движка таблицы Kafka

Ниже приведенный пример создает движок таблицы с той же схемой, что и таблица merge tree. Это не строго обязательно, так как у вас могут быть алиасы или эпhemerные колонки в целевой таблице. Однако настройки важны; обратите внимание на использование JSONEachRow в качестве типа данных для потребления JSON из темы Kafka. Значения github и clickhouse представляют собой имя темы и имена групп потребителей соответственно. Темы могут фактически представлять собой список значений.

Мы обсудим настройки движка и оптимизацию производительности ниже. На этом этапе простой выбор из таблицы github_queue должен прочитать несколько строк. Обратите внимание, что это переместит смещения потребителя вперед, предотвращая повторное чтение этих строк без сброса. Обратите внимание на лимит и обязательный параметр stream_like_engine_allow_direct_select.

6. Создание материализованного представления

Материализованное представление соединит две ранее созданные таблицы, читая данные из движка таблицы Kafka и вставляя их в целевую таблицу merge tree. Мы можем сделать несколько преобразований данных. Мы сделаем простое чтение и вставку. Использование * предполагает, что имена колонок идентичны (регистр имеет значение).

На момент создания материализованное представление подключается к движку Kafka и начинает читать, вставляя строки в целевую таблицу. Этот процесс будет продолжаться бесконечно, с последующими вставками сообщений в Kafka, которые будут потребляться. Не стесняйтесь повторно запускать сценарий вставки, чтобы вставить дополнительные сообщения в Kafka.

7. Подтверждение вставки строк

Подтвердите, что данные существуют в целевой таблице:

Вы должны увидеть 200,000 строк:

Распространенные операции

Остановка и перезапуск потребления сообщений

Чтобы остановить потребление сообщений, вы можете отсоединить таблицу движка Kafka:

Это не повлияет на смещения группы потребителей. Чтобы перезапустить потребление и продолжить с предыдущего смещения, повторно присоедините таблицу.

Добавление метаданных Kafka

Полезно отслеживать метаданные из оригинальных сообщений Kafka после его загрузки в ClickHouse. Например, нам может понадобиться знать, сколько определенной темы или партиции мы потребили. Для этой цели движок таблицы Kafka предоставляет несколько виртуальных колонок. Эти колонки могут быть сохранены как колонки в нашей целевой таблице путем изменения нашей схемы и выражения выбора материализованного представления.

Сначала мы выполняем операцию остановки, описанную выше, перед добавлением колонок в нашу целевую таблицу.

Ниже мы добавляем информационные колонки, чтобы указать исходную тему и партицию, из которой произошла строка.

Далее мы должны убедиться, что виртуальные колонки сопоставлены по мере необходимости. Виртуальные колонки имеют префикс _. Полный список виртуальных колонок можно найти здесь.

Чтобы обновить нашу таблицу с виртуальными колонками, нам нужно будет удалить материализованное представление, повторно присоединить таблицу движка Kafka и снова создать материализованное представление.

Новые потребленные строки должны иметь метаданные.

Результат выглядит следующим образом:

actor_loginevent_typecreated_attopicpartition
IgorMinarCommitCommentEvent2011-02-12 02:22:00github0
queeupCommitCommentEvent2011-02-12 02:23:23github0
IgorMinarCommitCommentEvent2011-02-12 02:23:24github0
IgorMinarCommitCommentEvent2011-02-12 02:24:50github0
IgorMinarCommitCommentEvent2011-02-12 02:25:20github0
dapiCommitCommentEvent2011-02-12 06:18:36github0
sourcerebelsCommitCommentEvent2011-02-12 06:34:10github0
jamierumbelowCommitCommentEvent2011-02-12 12:21:40github0
jpnCommitCommentEvent2011-02-12 12:24:31github0
OxoniumCommitCommentEvent2011-02-12 12:31:28github0
Изменение настроек движка Kafka

Мы рекомендуем удалить таблицу движка Kafka и воссоздать ее с новыми настройками. Материализованное представление не нужно изменять в ходе этого процесса - потребление сообщений возобновится после воссоздания таблицы движка Kafka.

Отладка проблем

Ошибки, такие как проблемы с аутентификацией, не сообщаются в ответах на DDL движка Kafka. Для диагностики проблем мы рекомендуем использовать основной файл журнала ClickHouse clickhouse-server.err.log. Дополнительное журнальное отслеживание для библиотек клиента Kafka librdkafka можно включить через конфигурацию.

Обработка неправильно сформированных сообщений

Kafka часто используется как "свалка" для данных. Это приводит к темам с смешанными форматами сообщений и неконсистентными именами полей. Избегайте этого и используйте функции Kafka, такие как Kafka Streams или ksqlDB, чтобы убедиться, что сообщения хорошо сформированы и согласованы перед вставкой в Kafka. Если эти варианты невозможны, ClickHouse имеет некоторые функции, которые могут помочь.

  • Рассматривайте поле сообщения как строки. В выражении материализованного представления могут быть использованы функции для очистки и преобразования, если это потребуется. Это не должно представлять собой производственное решение, но может помочь при разовой загрузке.
  • Если вы потребляете JSON из темы, используя формат JSONEachRow, используйте настройку input_format_skip_unknown_fields. При записи данных, по умолчанию, ClickHouse выдает исключение, если входные данные содержат колонки, которые не существуют в целевой таблице. Однако, если этот параметр включен, эти лишние колонки будут игнорироваться. Опять же, это не решение для производственного уровня и может запутать других.
  • Рассмотрите настройку kafka_skip_broken_messages. Это требует от пользователя указать уровень терпимости на блок для неправильно сформированных сообщений - учитываемых в контексте kafka_max_block_size. Если это терпимость превышена (измеренная в абсолютных сообщениях), обычное поведение исключений вернется, и другие сообщения будут пропущены.
Семантика доставки и проблемы с дубликатами

Движок таблицы Kafka имеет семантику хотя бы раз. Дубликаты возможны в нескольких известных редких обстоятельствах. Например, сообщения могут быть прочитаны из Kafka и успешно вставлены в ClickHouse. Прежде чем новое смещение может быть зафиксировано, соединение с Kafka потеряно. В этой ситуации требуется повторная попытка блока. Блок можно де-дубликатировать, используя распределенную таблицу или ReplicatedMergeTree в качестве целевой таблицы. Хотя это снижает вероятность дублирования строк, это зависит от одинаковых блоков. События, такие как перераспределение Kafka, могут аннулировать это предположение, вызывая дубликаты в редких обстоятельствах.

Вставки на основе кворума

Вам могут понадобиться вставки на основе кворума в случаях, когда требуются более высокие гарантии доставки в ClickHouse. Это не может быть установлено для материализованного представления или целевой таблицы. Однако это может быть установлено для пользовательских профилей, например:

ClickHouse в Kafka

Хотя это и редкий случай, данные ClickHouse также могут быть сохранены в Kafka. Например, мы вставим строки вручную в движок таблицы Kafka. Эти данные будут прочитаны тем же движком Kafka, чье материализованное представление разместит данные в таблице Merge Tree. Наконец, мы продемонстрируем применение материализованных представлений для вставок в Kafka для считывания таблиц из существующих исходных таблиц.

Шаги

Наша первоначальная цель лучше всего проиллюстрирована:

Движок таблицы Kafka с вставками

Мы предполагаем, что у вас созданы таблицы и представления в соответствии с шагами для Kafka в ClickHouse и что тема была полностью потреблена.

1. Прямое вставление строк

Сначала подтвердите количество строк в целевой таблице.

У вас должно быть 200,000 строк:

Теперь вставляем строки из целевой таблицы GitHub обратно в движок таблицы Kafka github_queue. Обратите внимание, как мы используем формат JSONEachRow и ограничиваем выборку до 100.

Пересчитайте строки в GitHub, чтобы подтвердить, что их количество увеличилось на 100. Как показано на приведенной выше схеме, строки были вставлены в Kafka через движок таблицы Kafka, прежде чем быть повторно прочитанными тем же движком и вставленными в целевую таблицу GitHub нашим материализованным представлением!

Вы должны увидеть 100 дополнительных строк:

2. Использование материализованных представлений

Мы можем использовать материализованные представления для отправки сообщений в движок Kafka (и в тему), когда документы вставляются в таблицу. Когда строки вставляются в таблицу GitHub, материализованное представление срабатывает, что приводит к вставке строк обратно в движок Kafka и в новую тему. Опять же, это лучше всего проиллюстрировано:

Движок таблицы Kafka с вставками

Создайте новую тему Kafka github_out или аналогичную. Убедитесь, что движок таблицы Kafka github_out_queue указывает на эту тему.

Теперь создайте новое материализованное представление github_out_mv, чтобы указать на таблицу GitHub, вставляя строки в вышеуказанный движок, когда оно срабатывает. Добавление новых строк в таблицу GitHub приведет к тому, что они будут отправлены в нашу новую тему Kafka.

Если вы вставите в оригинальную тему github, созданную в рамках Kafka в ClickHouse, документы волшебным образом появятся в теме "github_clickhouse". Подтвердите это с помощью встроенных инструментов Kafka. Например, ниже мы вставляем 100 строк в тему github, используя kcat для темы, размещенной в Confluent Cloud:

Чтение по теме github_out должно подтвердить доставку сообщений.

Хотя это и сложный пример, он иллюстрирует мощь материализованных представлений при использовании в сочетании с движком Kafka.

Кластеры и производительность

Работа с кластерами ClickHouse

Через группы потребителей Kafka несколько экземпляров ClickHouse могут потенциально читать из одной и той же темы. Каждый потребитель будет назначен партиции темы в соотношении 1:1. При масштабировании потребления ClickHouse с помощью движка таблицы Kafka необходимо учитывать, что общее количество потребителей внутри кластера не может превышать количество партиций в теме. Поэтому убедитесь, что партиционирование заранее правильно настроено для темы.

Несколько экземпляров ClickHouse могут быть настроены для чтения из темы, используя один и тот же идентификатор группы потребителей - указанный во время создания движка таблицы Kafka. Таким образом, каждый экземпляр будет читать из одной или нескольких партиций, вставляя сегменты в свою локальную целевую таблицу. Эти целевые таблицы, в свою очередь, могут быть настроены на использование ReplicatedMergeTree для обработки дублирования данных. Этот подход позволяет масштабировать чтение из Kafka с кластером ClickHouse, при условии, что имеется достаточное количество партиций Kafka.

Движок таблицы Kafka с вставками

Настройка производительности

Обратите внимание на следующее, когда вы хотите увеличить производительность пропускной способности таблицы движка Kafka:

  • Производительность будет варьироваться в зависимости от размера сообщения, формата и типов целевых таблиц. 100k строк/сек на одном движке таблицы следует считать достижимым. По умолчанию сообщения читаются партиями, контролируемыми параметром kafka_max_block_size. По умолчанию этот параметр установлен на max_insert_block_size, по умолчанию равен 1,048,576. Если сообщения не чрезвычайно большие, этот параметр почти всегда следует увеличивать. Значения от 500k до 1M не являются редкостью. Проведите тестирование и оцените влияние на производительность пропускной способности.
  • Число потребителей для таблицы движка можно увеличить с помощью kafka_num_consumers. Однако, по умолчанию, вставки будут линейно выполняться в одном потоке, если kafka_thread_per_consumer не изменен с значения по умолчанию 1. Установите это значение в 1, чтобы гарантировать, что сбросы выполняются параллельно. Обратите внимание, что создание таблицы движка Kafka с N потребителями (и kafka_thread_per_consumer=1) логически эквивалентно созданию N движков Kafka, каждый из которых имеет материализованное представление и kafka_thread_per_consumer=0.
  • Увеличение числа потребителей не является бесплатной операцией. Каждый потребитель поддерживает свои собственные буферы и потоки, увеличивая накладные расходы на сервер. Обращайте внимание на накладные расходы потребителей и масштабируйте линейно сначала по вашему кластеру, если это возможно.
  • Если пропускная способность сообщений Kafka варьируется, а задержки допустимы, рассмотрите возможность увеличения stream_flush_interval_ms, чтобы гарантировать, что большие блоки сбрасываются.
  • background_message_broker_schedule_pool_size устанавливает количество потоков, выполняющих фоновые задачи. Эти потоки используются для потоковой передачи Kafka. Этот параметр применяется при старте сервера ClickHouse и не может быть изменен в пользовательской сессии, по умолчанию равен 16. Если в журналах наблюдаются таймауты, может быть целесообразно увеличить это значение.
  • Для общения с Kafka используется библиотека librdkafka, которая сама создает потоки. Большое количество таблиц Kafka или потребителей может привести к большому количеству переключений контекста. Либо распределите эту нагрузку по кластеру, только реплицируя целевые таблицы, если возможно, либо рассмотрите использование движка таблицы для чтения из нескольких тем - поддерживается список значений. Несколько материализованных представлений могут быть прочитаны из одной таблицы, каждое фильтруя данные от конкретной темы.

Любые изменения настроек должны быть протестированы. Мы рекомендуем отслеживать задержки потребителей Kafka, чтобы убедиться, что вы должным образом масштабированы.

Дополнительные настройки

Помимо обсужденных выше настроек, следующие могут представлять интерес:

  • Kafka_max_wait_ms - Время ожидания в миллисекундах для чтения сообщений из Kafka перед повторной попыткой. Устанавливается на уровне пользовательского профиля и по умолчанию составляет 5000.

Все настройки из основной библиотеки librdkafka также могут быть размещены в конфигурационных файлах ClickHouse внутри элемента kafka - имена настроек должны быть XML-элементами, с заменой точек на подчеркивания например.

Это экспертные настройки, и мы рекомендуем вам обратиться к документации Kafka для более глубокого объяснения.