Confluent HTTP Sink Connector
HTTP Sink Connector не зависит от типа данных и, соответственно, не требует схемы Kafka, поддерживая при этом специфические для ClickHouse типы данных, такие как Maps и Arrays. Эта дополнительная гибкость подразумевает небольшое усложнение конфигурации.
Ниже мы описываем простую установку, извлекая сообщения из одной темы Kafka и вставляя строки в таблицу ClickHouse.
HTTP Connector распространяется по лицензии Confluent Enterprise License.
Шаги быстрого старта
1. Соберите ваши данные о подключении
Чтобы подключиться к ClickHouse с помощью HTTP(S), вам нужна следующая информация:
-
ХОСТ и ПОРТ: обычно порт 8443 при использовании TLS или 8123 при его отсутствии.
-
НАЗВАНИЕ БАЗЫ ДАННЫХ: по умолчанию есть база данных с именем
default
, используйте имя базы данных, к которой вы хотите подключиться. -
ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя
default
. Используйте имя пользователя, подходящее для вашего случая использования.
Данные для вашей службы ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите службу, к которой вы будете подключаться, и нажмите Подключиться:

Выберите HTTPS, и детали будут доступны в примере команды curl
.

Если вы используете self-managed ClickHouse, детали подключения задаются вашим администратором ClickHouse.
2. Запустите Kafka Connect и HTTP Sink Connector
У вас есть два варианта:
-
Self-managed: Скачайте пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, как указано здесь. Если вы используете метод установки confluent-hub, ваши локальные конфигурационные файлы будут обновлены.
-
Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для размещения Kafka. Это требует, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.
В следующих примерах используется Confluent Cloud.
3. Создайте целевую таблицу в ClickHouse
Прежде чем проводить тест на подключение, начнем с создания тестовой таблицы в ClickHouse Cloud, которая будет получать данные из Kafka:
4. Настройте HTTP Sink
Создайте тему Kafka и экземпляр HTTP Sink Connector:

Настройте HTTP Sink Connector:
- Укажите имя темы, которую вы создали
- Аутентификация
HTTP Url
- URL ClickHouse Cloud с указанием запросаINSERT
<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow
. Примечание: запрос должен быть закодирован.Endpoint Authentication type
- BASICAuth username
- Имя пользователя ClickHouseAuth password
- Пароль ClickHouse
Этот HTTP Url подвержен ошибкам. Убедитесь, что экранирование выполнено точно, чтобы избежать проблем.

- Конфигурация
Input Kafka record value format
зависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. Мы предполагаемJSON
в следующих настройках.- В разделе
advanced configurations
:HTTP Request Method
- Установите на POSTRequest Body Format
- jsonBatch batch size
- В соответствии с рекомендациями ClickHouse, установите это значение на по меньшей мере 1000.Batch json as array
- trueRetry on HTTP codes
- 400-500, но адаптируйте по мере необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.Maximum Reties
- значение по умолчанию (10) подходит, но вы можете скорректировать для более надежного повторного запроса.

5. Тестирование подключения
Создайте сообщение в теме, настроенной вашим HTTP Sink

и убедитесь, что созданное сообщение было записано в вашу инстанцию ClickHouse.
Устранение неполадок
HTTP Sink не выполняет пакетную обработку сообщений
HTTP Sink connector не объединяет запросы для сообщений, содержащих различные значения заголовков Kafka.
- Убедитесь, что ваши записи Kafka имеют одинаковый ключ.
- Когда вы добавляете параметры к HTTP API URL, каждая запись может привести к уникальному URL. По этой причине пакетная обработка отключена при использовании дополнительных URL-параметров.
400 Bad Request
CANNOT_PARSE_QUOTED_STRING
Если HTTP Sink терпит неудачу с сообщением при вставке JSON-объекта в String
колонку:
Установите параметр input_format_json_read_objects_as_strings=1
в URL в виде закодированной строки SETTINGS%20input_format_json_read_objects_as_strings%3D1
Загрузите набор данных GitHub (по желанию)
Обратите внимание, что этот пример сохраняет поля Array набора данных GitHub. Мы предполагаем, что у вас есть пустая тема github в примерах и используем kcat для вставки сообщений в Kafka.
1. Подготовьте конфигурацию
Следуйте этим инструкциям для настройки Connect, относящейся к вашему типу установки,обращая внимание на отличия между автономным и распределенным кластером. Если вы используете Confluent Cloud, распределенная настройка актуальна.
Наиболее важный параметр - это http.api.url
. HTTP-интерфейс для ClickHouse требует, чтобы вы кодировали запрос INSERT как параметр в URL. Это должно включать формат (в данном случае JSONEachRow
) и целевую базу данных. Формат должен соответствовать данным Kafka, которые будут преобразованы в строку в HTTP-теле. Эти параметры должны быть URL-экранированы. Пример такого формата для набора данных GitHub (предполагая, что вы запускаете ClickHouse локально) показан ниже:
Следующие дополнительные параметры имеют значение при использовании HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:
request.method
- Установите на POSTretry.on.status.codes
- Установите на 400-500 для повторной попытки при любых кодах ошибок. Уточните в зависимости от ожидаемых ошибок в данных.request.body.format
- В большинстве случаев это будет JSON.auth.type
- Установите на BASIC, если вы используете безопасность с ClickHouse. Другие механизмы аутентификации, совместимые с ClickHouse, в настоящее время не поддерживаются.ssl.enabled
- установите на true, если используете SSL.connection.user
- имя пользователя для ClickHouse.connection.password
- пароль для ClickHouse.batch.max.size
- Количество строк для отправки в одном пакете. Убедитесь, что это установлено на достаточное большое число. В соответствии с рекомендациями ClickHouse рекомендации значение 1000 следует считать минимальным.tasks.max
- HTTP Sink connector поддерживает выполнение одной или нескольких задач. Это может использоваться для увеличения производительности. Вместе с размером пакета это представляет собой ваш основной способ улучшения производительности.key.converter
- установите в зависимости от типов ваших ключей.value.converter
- установите в зависимости от типа данных в вашей теме. Эти данные не требуют схемы. Формат здесь должен соответствовать FORMAT, указанному в параметреhttp.api.url
. Проще всего использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно обращаться с значением как со строкой через конвертер org.apache.kafka.connect.storage.StringConverter - хотя это потребует от пользователя извлечения значения в выражении вставки с использованием функций. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.
Полный список настроек, включая то, как настроить прокси, повторные попытки и расширенный SSL, можно найти здесь.
Примеры конфигурационных файлов для данных выборки GitHub можно найти здесь, при условии, что Connect работает в автономном режиме, а Kafka размещена в Confluent Cloud.
2. Создайте таблицу ClickHouse
Убедитесь, что таблица создана. Пример для минимального набора данных GitHub с использованием стандартного MergeTree показан ниже.
3. Добавьте данные в Kafka
Вставьте сообщения в Kafka. Ниже мы используем kcat для вставки 10 тысяч сообщений.
Простое чтение из целевой таблицы "Github" должно подтвердить вставку данных.