Перейти к основному содержанию
Перейти к основному содержанию

Коннектор Confluent HTTP Sink

Коннектор HTTP Sink не зависит от формата данных, поэтому не требует схемы Kafka и при этом поддерживает специфичные для ClickHouse типы данных, такие как Map и Array. Эта дополнительная гибкость приводит к небольшому усложнению конфигурации.

Ниже описана простая установка, считывающая сообщения из одного топика Kafka и вставляющая строки в таблицу ClickHouse.

Примечание

HTTP Connector распространяется по лицензии Confluent Enterprise.

Быстрый старт

1. Соберите параметры подключения

To connect to ClickHouse with HTTP(S) you need this information:

Parameter(s)Description
HOST and PORTTypically, the port is 8443 when using TLS or 8123 when not using TLS.
DATABASE NAMEOut of the box, there is a database named default, use the name of the database that you want to connect to.
USERNAME and PASSWORDOut of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS. Connection details are displayed in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.

2. Запустите Kafka Connect и коннектор HTTP Sink

У вас есть два варианта:

  • Самостоятельное развертывание: Загрузите пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, приведённым здесь. Если вы используете метод установки через confluent-hub, ваши локальные файлы конфигурации будут обновлены.

  • Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для хостинга Kafka. Для этого требуется, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.

Примечание

В следующих примерах используется Confluent Cloud.

3. Создайте целевую таблицу в ClickHouse

Перед проверкой соединения создадим тестовую таблицу в ClickHouse Cloud, которая будет получать данные из Kafka:

CREATE TABLE default.my_table
(
    `side` String,
    `quantity` Int32,
    `symbol` String,
    `price` Int32,
    `account` String,
    `userid` String
)
ORDER BY tuple()

4. Настройте HTTP Sink

Создайте топик Kafka и экземпляр HTTP Sink Connector:

Интерфейс Confluent Cloud, показывающий, как создать коннектор HTTP Sink

Настройте HTTP Sink Connector:

  • Укажите имя созданного топика
  • Authentication
    • HTTP Url — URL-адрес ClickHouse Cloud с указанным запросом INSERT <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow. Примечание: запрос должен быть закодирован.
    • Endpoint Authentication type — BASIC
    • Auth username — имя пользователя ClickHouse
    • Auth password — пароль ClickHouse
Примечание

С этим HTTP Url легко допустить ошибку. Убедитесь, что экранирование выполнено точно, чтобы избежать проблем.

Интерфейс Confluent Cloud, показывающий настройки аутентификации для коннектора HTTP Sink

  • Configuration
    • Input Kafka record value format зависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. В дальнейших настройках мы предполагаем формат JSON.
    • В разделе advanced configurations:
      • HTTP Request Method — установите POST
      • Request Body Format — json
      • Batch batch size — согласно рекомендациям ClickHouse установите значение не менее 1000.
      • Batch json as array — true
      • Retry on HTTP codes — 400-500, но адаптируйте при необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.
      • Maximum Reties — значение по умолчанию (10) подходит, но вы можете скорректировать его для более надежных повторных попыток.
Интерфейс Confluent Cloud, показывающий расширенные параметры конфигурации для коннектора HTTP Sink

5. Тестирование подключения

Создайте сообщение в топике, настроенном для вашего HTTP Sink

Интерфейс Confluent Cloud, показывающий, как создать тестовое сообщение в топике Kafka

и убедитесь, что созданное сообщение было записано в ваш экземпляр ClickHouse.

Устранение неполадок

HTTP Sink не объединяет сообщения в батчи

Из документации по Sink:

Коннектор HTTP Sink не объединяет запросы для сообщений, содержащих значения заголовков Kafka, которые различаются.

  1. Убедитесь, что все ваши записи в Kafka имеют один и тот же ключ.
  2. Когда вы добавляете параметры к URL HTTP API, каждая запись может приводить к формированию уникального URL-адреса. По этой причине пакетная отправка отключена при использовании дополнительных параметров URL.

400 bad request

CANNOT_PARSE_QUOTED_STRING

Если HTTP Sink завершает работу с ошибкой со следующим сообщением при вставке JSON-объекта в столбец типа String:

Код: 26. DB::ParsingException: Невозможно разобрать JSON строку: ожидается открывающая кавычка: (при чтении значения ключа key_name): При выполнении JSONEachRowRowInputFormat: (в строке 1). (CANNOT_PARSE_QUOTED_STRING)

Установите настройку input_format_json_read_objects_as_strings=1 в URL как URL‑кодированную строку SETTINGS%20input_format_json_read_objects_as_strings%3D1

Загрузка набора данных GitHub (необязательно)

Обратите внимание, что в этом примере сохраняются поля типа Array набора данных Github. Мы предполагаем, что у вас есть пустой топик github в примерах и что вы используете kcat для вставки сообщений в Kafka.

1. Подготовьте конфигурацию

Следуйте этим инструкциям по настройке Connect в соответствии с типом вашей установки, обращая внимание на различия между автономным и распределённым кластерами. Если вы используете Confluent Cloud, вам подходит распределённая схема.

Наиболее важным параметром является http.api.url. HTTP‑интерфейс для ClickHouse требует, чтобы вы закодировали выражение INSERT как параметр в URL. Оно должно включать формат (в данном случае JSONEachRow) и целевую базу данных. Формат должен соответствовать формату данных в Kafka, которые будут преобразованы в строку в теле HTTP‑запроса. Эти параметры должны быть URL‑кодированы. Пример такого формата для набора данных Github (предполагая, что вы запускаете ClickHouse локально) показан ниже:

<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow

http://localhost:8123?query=INSERT%20INTO%20default.github%20FORMAT%20JSONEachRow

Следующие дополнительные параметры относятся к использованию HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:

  • request.method - Установите в POST.
  • retry.on.status.codes - Установите в 400–500 для повторных попыток при любых кодах ошибок. Уточните значение на основе ожидаемых ошибок в данных.
  • request.body.format - В большинстве случаев это будет JSON.
  • auth.type - Установите в BASIC, если вы используете базовую аутентификацию в ClickHouse. Другие механизмы аутентификации, совместимые с ClickHouse, в настоящее время не поддерживаются.
  • ssl.enabled - установите в true при использовании SSL.
  • connection.user - имя пользователя для ClickHouse.
  • connection.password - пароль для ClickHouse.
  • batch.max.size - Количество строк, отправляемых в одном пакете. Убедитесь, что это значение достаточно велико. Согласно рекомендациям ClickHouse, значение 1000 следует считать минимальным.
  • tasks.max - Коннектор HTTP Sink поддерживает выполнение одной или нескольких задач. Это можно использовать для повышения производительности. Совместно с размером пакета это ваши основные средства улучшения производительности.
  • key.converter - задайте в соответствии с типами ваших ключей.
  • value.converter - задайте исходя из типа данных в вашем топике. Для этих данных не требуется схема. Формат здесь должен быть согласован с FORMAT, указанным в параметре http.api.url. Проще всего использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно рассматривать значение как строку с помощью конвертера org.apache.kafka.connect.storage.StringConverter, однако это потребует от пользователя извлечения значения в операторе INSERT с использованием функций. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.

Полный список настроек, включая конфигурацию прокси, повторные попытки и расширенный SSL, можно найти здесь.

Примеры конфигурационных файлов для тестовых данных GitHub можно найти здесь, при условии, что Kafka Connect запущен в автономном режиме (standalone), а Kafka размещена в Confluent Cloud.

2. Создайте таблицу ClickHouse

Убедитесь, что таблица создана. Ниже приведён пример минимального набора данных GitHub, использующего стандартный движок MergeTree.

CREATE TABLE github
(
    file_time DateTime,
    event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4,'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
    actor_login LowCardinality(String),
    repo_name LowCardinality(String),
    created_at DateTime,
    updated_at DateTime,
    action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
    comment_id UInt64,
    path String,
    ref LowCardinality(String),
    ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
    creator_user_login LowCardinality(String),
    number UInt32,
    title String,
    labels Array(LowCardinality(String)),
    state Enum('none' = 0, 'open' = 1, 'closed' = 2),
    assignee LowCardinality(String),
    assignees Array(LowCardinality(String)),
    closed_at DateTime,
    merged_at DateTime,
    merge_commit_sha String,
    requested_reviewers Array(LowCardinality(String)),
    merged_by LowCardinality(String),
    review_comments UInt32,
    member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)

3. Добавьте данные в Kafka

Отправьте сообщения в Kafka. Ниже мы используем kcat для отправки 10 000 сообщений.

head -n 10000 github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username>  -X sasl.password=<password> -t github

Простое чтение целевой таблицы «Github» должно подтвердить, что данные были вставлены.

SELECT count() FROM default.github;

| count\(\) |
| :--- |
| 10000 |