Перейти к основному содержимому
Перейти к основному содержимому

ClickHouse Kafka Connect Sink

примечание

Если вам нужна помощь, пожалуйста, создайте проблему в репозитории или задайте вопрос в публичном Slack ClickHouse.

ClickHouse Kafka Connect Sink - это коннектор Kafka, который передает данные из темы Kafka в таблицу ClickHouse.

License

Коннектор Kafka Sink распространяется под Лицензией Apache 2.0

Requirements for the environment

В окружении должна быть установлена фреймворк Kafka Connect версии 2.7 или более поздней.

Version compatibility matrix

Версия ClickHouse Kafka ConnectВерсия ClickHouseKafka ConnectПлатформа Confluent
1.0.0> 23.3> 2.7> 6.1

Main Features

  • Поставляется с встроенной семантикой exactly-once. Она поддерживается новой функцией ядра ClickHouse, названной KeeperMap (используется как состояние хранения коннектора) и позволяет создать минималистичную архитектуру.
  • Поддержка сторонних хранилищ состояния: в данный момент по умолчанию используется In-memory, но может использовать KeeperMap (Redis будет добавлен в ближайшее время).
  • Интеграция с основным ядром: создано, поддерживается и обслуживается ClickHouse.
  • Непрерывное тестирование с ClickHouse Cloud.
  • Вставки данных с объявленной схемой и без схемы.
  • Поддержка всех типов данных ClickHouse.

Installation instructions

Gather your connection details

Чтобы подключиться к ClickHouse с помощью HTTP(S), вам нужна следующая информация:

  • ХОСТ и ПОРТ: обычно порт 8443 при использовании TLS или 8123 при его отсутствии.

  • НАЗВАНИЕ БАЗЫ ДАННЫХ: по умолчанию есть база данных с именем default, используйте имя базы данных, к которой вы хотите подключиться.

  • ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя default. Используйте имя пользователя, подходящее для вашего случая использования.

Данные для вашей службы ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите службу, к которой вы будете подключаться, и нажмите Подключиться:

Кнопка подключения к службе ClickHouse Cloud

Выберите HTTPS, и детали будут доступны в примере команды curl.

Детали подключения ClickHouse Cloud через HTTPS

Если вы используете self-managed ClickHouse, детали подключения задаются вашим администратором ClickHouse.

General Installation Instructions

Коннектор распространяется в виде одного JAR-файла, содержащего все классы, необходимые для работы плагина.

Чтобы установить плагин, выполните следующие шаги:

  • Скачайте zip-архив, содержащий файл Connector JAR, со страницы Releases репозитория ClickHouse Kafka Connect Sink.
  • Извлеките содержимое ZIP-файла и скопируйте его в нужное место.
  • Добавьте путь с директорией плагина к настройке plugin.path в вашем файле свойств Connect, чтобы позволить платформе Confluent найти плагин.
  • Укажите имя темы, имя хоста экземпляра ClickHouse и пароль в конфигурации.
  • Перезапустите платформу Confluent.
  • Если вы используете платформу Confluent, войдите в интерфейс управления Confluent Control Center, чтобы убедиться, что ClickHouse Sink доступен в списке доступных коннекторов.

Configuration options

Чтобы подключить ClickHouse Sink к серверу ClickHouse, вам необходимо предоставить:

  • данные подключения: имя хоста (обязательно) и порт (необязательно)
  • учетные данные пользователя: пароль (обязательно) и имя пользователя (необязательно)
  • класс коннектора: com.clickhouse.kafka.connect.ClickHouseSinkConnector (обязательно)
  • topics или topics.regex: темы Kafka для отслеживания - имена тем должны соответствовать именам таблиц (обязательно)
  • конвертеры ключа и значения: настраиваются в зависимости от типа данных в вашей теме. Обязательно, если еще не определены в конфигурации рабочего процесса.

Полная таблица параметров конфигурации:

Название параметраОписаниеЗначение по умолчанию
hostname (обязательно)Имя хоста или IP-адрес сервераN/A
portПорт ClickHouse - по умолчанию 8443 (для HTTPS в облаке), но для HTTP (по умолчанию для собственных серверов) он должен составлять 81238443
sslВключить ssl-соединение с ClickHousetrue
jdbcConnectionPropertiesСвойства подключения при подключении к ClickHouse. Должен начинаться с ? и соединяться с помощью & между param=value""
usernameИмя пользователя базы данных ClickHousedefault
password (обязательно)Пароль базы данных ClickHouseN/A
databaseИмя базы данных ClickHousedefault
connector.class (обязательно)Класс коннектора (в явном виде установлен и сохранен как значение по умолчанию)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxКоличество задач коннектора"1"
errors.retry.timeoutТаймаут повторных попыток ClickHouse JDBC"60"
exactlyOnceВключение exactly Once"false"
topics (обязательно)Темы Kafka для отслеживания - имена тем должны соответствовать именам таблиц""
key.converter (обязательно* - см. Описание)Установить в зависимости от типов ваших ключей. Обязательно здесь, если вы передаете ключи (и не определены в конфигурации рабочего процесса)."org.apache.kafka.connect.storage.StringConverter"
value.converter (обязательно* - см. Описание)Установить в зависимости от типа данных в вашей теме. Поддерживаются: - JSON, String, Avro или Protobuf форматы. Обязательно здесь, если не определено в конфигурации рабочего процесса."org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableПоддержка схемы конвертера значения"false"
errors.toleranceОшибка толерантности коннектора. Поддерживаемые: none, all"none"
errors.deadletterqueue.topic.nameЕсли установлено (с errors.tolerance=all), будет использоваться DLQ для неудачных пакетов (см. Устранение неполадок)""
errors.deadletterqueue.context.headers.enableДобавляет дополнительные заголовки для DLQ""
clickhouseSettingsСписок настроек ClickHouse, разделенных запятой (например, "insert_quorum=2 и т. д...")""
topic2TableMapСписок, разделенный запятыми, который сопоставляет имена тем имена таблиц (например, "topic1=table1, topic2=table2 и т. д...")""
tableRefreshIntervalВремя (в секундах) для обновления кеша определения таблицы0
keeperOnClusterПозволяет конфигурировать параметр ON CLUSTER для собственных экземпляров (например, ON CLUSTER имясообщениевфайлеопределения) для таблицы connect_state exactly-once (см. Распределенные DDL Запросы""
bypassRowBinaryПозволяет отключить использование RowBinary и RowBinaryWithDefaults для данных на основе схемы (Avro, Protobuf и т. д.) - должен использоваться только в случае, если данные будут содержать пропущенные колонки, и Nullable/Default неприемлемы"false"
dateTimeFormatsФорматы даты и времени для анализа полей схемы DateTime64, разделенные ; (например, someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss).""
tolerateStateMismatchПозволяет коннектору игнорировать записи "раньше", чем текущее смещение, хранящееся AFTER_PROCESSING (например, если смещение 5 отправляется, а последнее записанное смещение составляет 250)"false"

Target Tables

ClickHouse Connect Sink читает сообщения из тем Kafka и пишет их в соответствующие таблицы. ClickHouse Connect Sink записывает данные в существующие таблицы. Пожалуйста, убедитесь, что целевая таблица с соответствующей схемой была создана в ClickHouse перед началом вставки данных в нее.

Каждая тема требует выделенной целевой таблицы в ClickHouse. Имя целевой таблицы должно совпадать с именем источника темы.

Pre-processing

Если вам необходимо преобразовать исходящие сообщения перед их отправкой в ClickHouse Kafka Connect Sink, используйте Kafka Connect Transformations.

Supported Data types

С объявленной схемой:

Тип Kafka ConnectТип ClickHouseПоддерживаетсяПримитив
STRINGStringДа
INT8Int8Да
INT16Int16Да
INT32Int32Да
INT64Int64Да
FLOAT32Float32Да
FLOAT64Float64Да
BOOLEANBooleanДа
ARRAYArray(T)Нет
MAPMap(Primitive, T)Нет
STRUCTVariant(T1, T2, …)Нет
STRUCTTuple(a T1, b T2, …)Нет
STRUCTNested(a T1, b T2, …)Нет
BYTESStringНет
org.apache.kafka.connect.data.TimeInt64 / DateTime64Нет
org.apache.kafka.connect.data.TimestampInt32 / Date32Нет
org.apache.kafka.connect.data.DecimalDecimalНет

Без объявленной схемы:

Запись преобразуется в JSON и отправляется в ClickHouse как значение в формате JSONEachRow.

Configuration Recipes

Вот некоторые общие рецепты конфигурации, которые помогут вам быстро начать.

Basic Configuration

Самая базовая конфигурация, чтобы начать - предполагается, что вы запускаете Kafka Connect в распределенном режиме и у вас запущен сервер ClickHouse на localhost:8443 с включенным SSL, данные находятся в формате JSON без схемы.

Basic Configuration with Multiple Topics

Коннектор может потреблять данные из нескольких тем.

Basic Configuration with DLQ

Using with different data formats

Avro Schema Support
Protobuf Schema Support

Пожалуйста, обратите внимание: если вы столкнетесь с проблемами отсутствующих классов, не каждое окружение поставляется с конвертером protobuf, и вам может понадобиться альтернативная версия jar с зависимостями.

JSON Schema Support
String Support

Коннектор поддерживает конвертер строк в разных форматах ClickHouse: JSON, CSV и TSV.

Logging

Логирование автоматически предоставляется платформой Kafka Connect. Место назначения и формат журналирования могут быть настроены через файл конфигурации Kafka connect.

Если вы используете платформу Confluent, логи можно увидеть с помощью выполнения команды CLI:

Для дополнительных подробностей ознакомьтесь с официальным учебником.

Monitoring

ClickHouse Kafka Connect сообщает о метриках времени выполнения через Java Management Extensions (JMX). JMX включен в Kafka Connector по умолчанию.

Имя MBeanName ClickHouse Connect:

ClickHouse Kafka Connect сообщает о следующих метриках:

НазваниеТипОписание
receivedRecordslongОбщее количество полученных записей.
recordProcessingTimelongОбщее время в наносекундах, затраченное на группировку и преобразование записей в единую структуру.
taskProcessingTimelongОбщее время в наносекундах, затраченное на обработку и вставку данных в ClickHouse.

Limitations

  • Удаления не поддерживаются.
  • Размер пакета наследуется от свойств потребителя Kafka.
  • При использовании KeeperMap для exactly-once и изменении или обратной перемотке смещения вам нужно удалить содержимое из KeeperMap для данной темы. (См. руководство по устранению неполадок ниже для получения более подробной информации)

Tuning Performance

Если вы когда-либо думали: "Я хотел бы настроить размер пакета для коннектора sink", то этот раздел для вас.

Connect Fetch vs Connector Poll

Kafka Connect (фреймворк, на котором построен наш коннектор sink) будет получать сообщения из тем Kafka в фоновом режиме (независимо от коннектора).

Вы можете управлять этим процессом, используя fetch.min.bytes и fetch.max.bytes - в то время как fetch.min.bytes задает минимальное количество, необходимое перед тем, как фреймворк передаст значения коннектору (в пределах временного лимита, установленного fetch.max.wait.ms), fetch.max.bytes устанавливает верхний размерный лимит. Если вы хотите передавать более крупные пакеты коннектору, одним из вариантов может быть увеличение минимального получения или максимального ожидания для создания более крупных пакетов данных.

Эти полученные данные затем обрабатываются клиентом коннектора, опрашивающим сообщения, где количество для каждого опроса контролируется max.poll.records - имейте в виду, что получение является независимым от опроса!

При настройке этих параметров пользователи должны стремиться к тому, чтобы их размер получения производил несколько пакетов max.poll.records (и помнить, что параметры fetch.min.bytes и fetch.max.bytes представляют собой сжатые данные) - таким образом, каждая задача коннектора вставляет как можно больший пакет.

ClickHouse оптимизирован для более крупных пакетов, даже с небольшими задержками, а не для частых, но меньших пакетов - чем больше пакет, тем лучше.

Больше деталей можно найти в документации Confluent или в документации Kafka.

Multiple high throughput topics

Если ваш коннектор настроен на подписку на несколько тем, вы используете topics2TableMap для сопоставления тем с таблицами, и вы испытываете узкое место при вставке, что приводит к задержке потребителя, подумайте о создании одного коннектора для каждой темы вместо этого. Основная причина, по которой это происходит, заключается в том, что в данный момент пакеты вставляются в каждую таблицу последовательно.

Создание одного коннектора для каждой темы является обходным решением, которое гарантирует, что вы получите максимально возможную скорость вставки.

Troubleshooting

"State mismatch for topic [someTopic] partition [0]"

Это происходит, когда смещение, хранящееся в KeeperMap, отличается от смещения, хранящегося в Kafka, обычно когда тема была удалена или смещение было настроено вручную. Чтобы исправить это, вам нужно удалить старые значения, хранящиеся для данной темы + раздела.

ПРИМЕЧАНИЕ: Эта корректировка может иметь последствия в отношении exactly-once.

"What errors will the connector retry?"

В данный момент внимание сосредоточено на выявлении ошибок, которые являются временными и могут быть повторены, включая:

  • ClickHouseException - это общее исключение, которое может быть выброшено ClickHouse. Обычно оно выбрасывается, когда сервер перегружен, и следующие коды ошибок считаются особенно временными:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - Это исключение выбрасывается, когда истекает время ожидания сокета.
  • UnknownHostException - Это исключение выбрасывается, когда хост не может быть разрешен.
  • IOException - Это исключение выбрасывается, когда возникает проблема с сетью.

"All my data is blank/zeroes"

Вероятно, поля в ваших данных не совпадают с полями в таблице - это особенно часто встречается с CDC (и форматом Debezium). Одно из распространенных решений - добавить преобразование flatten в конфигурацию вашего коннектора:

Это преобразует ваши данные из вложенного JSON в плоский JSON (используя _ в качестве разделителя). Поля в таблице будут следовать формату "field1_field2_field3" (например, "before_id", "after_id" и т. д.).

"I want to use my Kafka keys in ClickHouse"

Ключи Kafka не хранятся в поле значения по умолчанию, но вы можете использовать преобразование KeyToValue, чтобы переместить ключ в поле значения (под именем нового поля _key):