Перейти к основному содержимому
Перейти к основному содержимому

Confluent HTTP Sink Connector

HTTP Sink Connector не зависит от типа данных и, соответственно, не требует схемы Kafka, поддерживая при этом специфические для ClickHouse типы данных, такие как Maps и Arrays. Эта дополнительная гибкость подразумевает небольшое усложнение конфигурации.

Ниже мы описываем простую установку, извлекая сообщения из одной темы Kafka и вставляя строки в таблицу ClickHouse.

примечание

HTTP Connector распространяется по лицензии Confluent Enterprise License.

Шаги быстрого старта

1. Соберите ваши данные о подключении

Чтобы подключиться к ClickHouse с помощью HTTP(S), вам нужна следующая информация:

  • ХОСТ и ПОРТ: обычно порт 8443 при использовании TLS или 8123 при его отсутствии.

  • НАЗВАНИЕ БАЗЫ ДАННЫХ: по умолчанию есть база данных с именем default, используйте имя базы данных, к которой вы хотите подключиться.

  • ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя default. Используйте имя пользователя, подходящее для вашего случая использования.

Данные для вашей службы ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите службу, к которой вы будете подключаться, и нажмите Подключиться:

Кнопка подключения к службе ClickHouse Cloud

Выберите HTTPS, и детали будут доступны в примере команды curl.

Детали подключения ClickHouse Cloud через HTTPS

Если вы используете self-managed ClickHouse, детали подключения задаются вашим администратором ClickHouse.

2. Запустите Kafka Connect и HTTP Sink Connector

У вас есть два варианта:

  • Self-managed: Скачайте пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, как указано здесь. Если вы используете метод установки confluent-hub, ваши локальные конфигурационные файлы будут обновлены.

  • Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для размещения Kafka. Это требует, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.

примечание

В следующих примерах используется Confluent Cloud.

3. Создайте целевую таблицу в ClickHouse

Прежде чем проводить тест на подключение, начнем с создания тестовой таблицы в ClickHouse Cloud, которая будет получать данные из Kafka:

4. Настройте HTTP Sink

Создайте тему Kafka и экземпляр HTTP Sink Connector:

Создать HTTP Sink

Настройте HTTP Sink Connector:

  • Укажите имя темы, которую вы создали
  • Аутентификация
    • HTTP Url - URL ClickHouse Cloud с указанием запроса INSERT <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow. Примечание: запрос должен быть закодирован.
    • Endpoint Authentication type - BASIC
    • Auth username - Имя пользователя ClickHouse
    • Auth password - Пароль ClickHouse
примечание

Этот HTTP Url подвержен ошибкам. Убедитесь, что экранирование выполнено точно, чтобы избежать проблем.

Опции аутентификации для Confluent HTTP Sink
  • Конфигурация
    • Input Kafka record value format зависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. Мы предполагаем JSON в следующих настройках.
    • В разделе advanced configurations:
      • HTTP Request Method - Установите на POST
      • Request Body Format - json
      • Batch batch size - В соответствии с рекомендациями ClickHouse, установите это значение на по меньшей мере 1000.
      • Batch json as array - true
      • Retry on HTTP codes - 400-500, но адаптируйте по мере необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.
      • Maximum Reties - значение по умолчанию (10) подходит, но вы можете скорректировать для более надежного повторного запроса.
Расширенные опции для Confluent HTTP Sink

5. Тестирование подключения

Создайте сообщение в теме, настроенной вашим HTTP Sink

Создать сообщение в теме

и убедитесь, что созданное сообщение было записано в вашу инстанцию ClickHouse.

Устранение неполадок

HTTP Sink не выполняет пакетную обработку сообщений

Из документации Sink:

HTTP Sink connector не объединяет запросы для сообщений, содержащих различные значения заголовков Kafka.

  1. Убедитесь, что ваши записи Kafka имеют одинаковый ключ.
  2. Когда вы добавляете параметры к HTTP API URL, каждая запись может привести к уникальному URL. По этой причине пакетная обработка отключена при использовании дополнительных URL-параметров.

400 Bad Request

CANNOT_PARSE_QUOTED_STRING

Если HTTP Sink терпит неудачу с сообщением при вставке JSON-объекта в String колонку:

Установите параметр input_format_json_read_objects_as_strings=1 в URL в виде закодированной строки SETTINGS%20input_format_json_read_objects_as_strings%3D1

Загрузите набор данных GitHub (по желанию)

Обратите внимание, что этот пример сохраняет поля Array набора данных GitHub. Мы предполагаем, что у вас есть пустая тема github в примерах и используем kcat для вставки сообщений в Kafka.

1. Подготовьте конфигурацию

Следуйте этим инструкциям для настройки Connect, относящейся к вашему типу установки,обращая внимание на отличия между автономным и распределенным кластером. Если вы используете Confluent Cloud, распределенная настройка актуальна.

Наиболее важный параметр - это http.api.url. HTTP-интерфейс для ClickHouse требует, чтобы вы кодировали запрос INSERT как параметр в URL. Это должно включать формат (в данном случае JSONEachRow) и целевую базу данных. Формат должен соответствовать данным Kafka, которые будут преобразованы в строку в HTTP-теле. Эти параметры должны быть URL-экранированы. Пример такого формата для набора данных GitHub (предполагая, что вы запускаете ClickHouse локально) показан ниже:

Следующие дополнительные параметры имеют значение при использовании HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:

  • request.method - Установите на POST
  • retry.on.status.codes - Установите на 400-500 для повторной попытки при любых кодах ошибок. Уточните в зависимости от ожидаемых ошибок в данных.
  • request.body.format - В большинстве случаев это будет JSON.
  • auth.type - Установите на BASIC, если вы используете безопасность с ClickHouse. Другие механизмы аутентификации, совместимые с ClickHouse, в настоящее время не поддерживаются.
  • ssl.enabled - установите на true, если используете SSL.
  • connection.user - имя пользователя для ClickHouse.
  • connection.password - пароль для ClickHouse.
  • batch.max.size - Количество строк для отправки в одном пакете. Убедитесь, что это установлено на достаточное большое число. В соответствии с рекомендациями ClickHouse рекомендации значение 1000 следует считать минимальным.
  • tasks.max - HTTP Sink connector поддерживает выполнение одной или нескольких задач. Это может использоваться для увеличения производительности. Вместе с размером пакета это представляет собой ваш основной способ улучшения производительности.
  • key.converter - установите в зависимости от типов ваших ключей.
  • value.converter - установите в зависимости от типа данных в вашей теме. Эти данные не требуют схемы. Формат здесь должен соответствовать FORMAT, указанному в параметре http.api.url. Проще всего использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно обращаться с значением как со строкой через конвертер org.apache.kafka.connect.storage.StringConverter - хотя это потребует от пользователя извлечения значения в выражении вставки с использованием функций. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.

Полный список настроек, включая то, как настроить прокси, повторные попытки и расширенный SSL, можно найти здесь.

Примеры конфигурационных файлов для данных выборки GitHub можно найти здесь, при условии, что Connect работает в автономном режиме, а Kafka размещена в Confluent Cloud.

2. Создайте таблицу ClickHouse

Убедитесь, что таблица создана. Пример для минимального набора данных GitHub с использованием стандартного MergeTree показан ниже.

3. Добавьте данные в Kafka

Вставьте сообщения в Kafka. Ниже мы используем kcat для вставки 10 тысяч сообщений.

Простое чтение из целевой таблицы "Github" должно подтвердить вставку данных.