Коннектор Confluent HTTP Sink
Коннектор HTTP Sink не зависит от формата данных, поэтому не требует схемы Kafka и при этом поддерживает специфичные для ClickHouse типы данных, такие как Map и Array. Эта дополнительная гибкость приводит к небольшому усложнению конфигурации.
Ниже описана простая установка, считывающая сообщения из одного топика Kafka и вставляющая строки в таблицу ClickHouse.
HTTP Connector распространяется по лицензии Confluent Enterprise.
Быстрый старт
1. Соберите параметры подключения
To connect to ClickHouse with HTTP(S) you need this information:
| Parameter(s) | Description |
|---|---|
HOST and PORT | Typically, the port is 8443 when using TLS or 8123 when not using TLS. |
DATABASE NAME | Out of the box, there is a database named default, use the name of the database that you want to connect to. |
USERNAME and PASSWORD | Out of the box, the username is default. Use the username appropriate for your use case. |
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

Choose HTTPS. Connection details are displayed in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
2. Запустите Kafka Connect и коннектор HTTP Sink
У вас есть два варианта:
-
Самостоятельное развертывание: Загрузите пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, приведённым здесь. Если вы используете метод установки через confluent-hub, ваши локальные файлы конфигурации будут обновлены.
-
Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для хостинга Kafka. Для этого требуется, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.
В следующих примерах используется Confluent Cloud.
3. Создайте целевую таблицу в ClickHouse
Перед проверкой соединения создадим тестовую таблицу в ClickHouse Cloud, которая будет получать данные из Kafka:
4. Настройте HTTP Sink
Создайте топик Kafka и экземпляр HTTP Sink Connector:

Настройте HTTP Sink Connector:
- Укажите имя созданного топика
- Authentication
HTTP Url— URL-адрес ClickHouse Cloud с указанным запросомINSERT<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow. Примечание: запрос должен быть закодирован.Endpoint Authentication type— BASICAuth username— имя пользователя ClickHouseAuth password— пароль ClickHouse
С этим HTTP Url легко допустить ошибку. Убедитесь, что экранирование выполнено точно, чтобы избежать проблем.

- Configuration
Input Kafka record value formatзависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. В дальнейших настройках мы предполагаем форматJSON.- В разделе
advanced configurations:HTTP Request Method— установите POSTRequest Body Format— jsonBatch batch size— согласно рекомендациям ClickHouse установите значение не менее 1000.Batch json as array— trueRetry on HTTP codes— 400-500, но адаптируйте при необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.Maximum Reties— значение по умолчанию (10) подходит, но вы можете скорректировать его для более надежных повторных попыток.

5. Тестирование подключения
Создайте сообщение в топике, настроенном для вашего HTTP Sink

и убедитесь, что созданное сообщение было записано в ваш экземпляр ClickHouse.
Устранение неполадок
HTTP Sink не объединяет сообщения в батчи
Коннектор HTTP Sink не объединяет запросы для сообщений, содержащих значения заголовков Kafka, которые различаются.
- Убедитесь, что все ваши записи в Kafka имеют один и тот же ключ.
- Когда вы добавляете параметры к URL HTTP API, каждая запись может приводить к формированию уникального URL-адреса. По этой причине пакетная отправка отключена при использовании дополнительных параметров URL.
400 bad request
CANNOT_PARSE_QUOTED_STRING
Если HTTP Sink завершает работу с ошибкой со следующим сообщением при вставке JSON-объекта в столбец типа String:
Установите настройку input_format_json_read_objects_as_strings=1 в URL как URL‑кодированную строку SETTINGS%20input_format_json_read_objects_as_strings%3D1
Загрузка набора данных GitHub (необязательно)
Обратите внимание, что в этом примере сохраняются поля типа Array набора данных Github. Мы предполагаем, что у вас есть пустой топик github в примерах и что вы используете kcat для вставки сообщений в Kafka.
1. Подготовьте конфигурацию
Следуйте этим инструкциям по настройке Connect в соответствии с типом вашей установки, обращая внимание на различия между автономным и распределённым кластерами. Если вы используете Confluent Cloud, вам подходит распределённая схема.
Наиболее важным параметром является http.api.url. HTTP‑интерфейс для ClickHouse требует, чтобы вы закодировали выражение INSERT как параметр в URL. Оно должно включать формат (в данном случае JSONEachRow) и целевую базу данных. Формат должен соответствовать формату данных в Kafka, которые будут преобразованы в строку в теле HTTP‑запроса. Эти параметры должны быть URL‑кодированы. Пример такого формата для набора данных Github (предполагая, что вы запускаете ClickHouse локально) показан ниже:
Следующие дополнительные параметры относятся к использованию HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:
request.method- Установите в POST.retry.on.status.codes- Установите в 400–500 для повторных попыток при любых кодах ошибок. Уточните значение на основе ожидаемых ошибок в данных.request.body.format- В большинстве случаев это будет JSON.auth.type- Установите в BASIC, если вы используете базовую аутентификацию в ClickHouse. Другие механизмы аутентификации, совместимые с ClickHouse, в настоящее время не поддерживаются.ssl.enabled- установите в true при использовании SSL.connection.user- имя пользователя для ClickHouse.connection.password- пароль для ClickHouse.batch.max.size- Количество строк, отправляемых в одном пакете. Убедитесь, что это значение достаточно велико. Согласно рекомендациям ClickHouse, значение 1000 следует считать минимальным.tasks.max- Коннектор HTTP Sink поддерживает выполнение одной или нескольких задач. Это можно использовать для повышения производительности. Совместно с размером пакета это ваши основные средства улучшения производительности.key.converter- задайте в соответствии с типами ваших ключей.value.converter- задайте исходя из типа данных в вашем топике. Для этих данных не требуется схема. Формат здесь должен быть согласован с FORMAT, указанным в параметреhttp.api.url. Проще всего использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно рассматривать значение как строку с помощью конвертера org.apache.kafka.connect.storage.StringConverter, однако это потребует от пользователя извлечения значения в операторе INSERT с использованием функций. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.
Полный список настроек, включая конфигурацию прокси, повторные попытки и расширенный SSL, можно найти здесь.
Примеры конфигурационных файлов для тестовых данных GitHub можно найти здесь, при условии, что Kafka Connect запущен в автономном режиме (standalone), а Kafka размещена в Confluent Cloud.
2. Создайте таблицу ClickHouse
Убедитесь, что таблица создана. Ниже приведён пример минимального набора данных GitHub, использующего стандартный движок MergeTree.
3. Добавьте данные в Kafka
Отправьте сообщения в Kafka. Ниже мы используем kcat для отправки 10 000 сообщений.
Простое чтение целевой таблицы «Github» должно подтвердить, что данные были вставлены.