跳到主要内容
跳到主要内容

Confluent HTTP Sink Connector

HTTP Sink Connector 是数据类型无关的,因此不需要 Kafka 模式,并且支持 ClickHouse 特有的数据类型,如 Maps 和 Arrays。这种额外的灵活性带来了轻微的配置复杂性增加。

下面我们将描述一个简单的安装,拉取来自单个 Kafka 主题的消息并插入行到 ClickHouse 表中。

备注

HTTP Connector 根据 Confluent Enterprise License 分发。

快速启动步骤

1. 收集连接详细信息

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。ご利用のケースに適したユーザー名を使用してください。

ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurlコマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。

2. 运行 Kafka Connect 和 HTTP Sink Connector

你有两个选择:

  • 自管理: 下载 Confluent 包并在本地安装。按照 这里 文档中的安装指南进行操作。如果你使用 confluent-hub 安装方法,你的本地配置文件将会更新。

  • Confluent Cloud: 对于使用 Confluent Cloud 进行 Kafka 托管的用户,HTTP Sink 的完全托管版本可供使用。这要求你的 ClickHouse 环境能够从 Confluent Cloud 访问。

备注

下面的示例使用的是 Confluent Cloud。

3. 在 ClickHouse 中创建目标表

在连接性测试之前,让我们先在 ClickHouse Cloud 中创建一个测试表,该表将接收来自 Kafka 的数据:

4. 配置 HTTP Sink

创建 Kafka 主题和 HTTP Sink Connector 实例:


配置 HTTP Sink Connector:

  • 提供你创建的主题名称
  • 认证
    • HTTP Url - ClickHouse Cloud URL,指定一个 INSERT 查询 <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow注意:查询必须被编码。
    • Endpoint Authentication type - BASIC
    • Auth username - ClickHouse 用户名
    • Auth password - ClickHouse 密码
备注

这个 HTTP Url 易出错。确保转义精确,以避免问题。


  • 配置
    • Input Kafka record value format - 根据你的源数据而定,但大多数情况下为 JSON 或 Avro。我们假设以下设置为 JSON
    • advanced configurations 部分:
      • HTTP Request Method - 设置为 POST
      • Request Body Format - json
      • Batch batch size - 根据 ClickHouse 推荐,将其设置为 至少 1000
      • Batch json as array - true
      • Retry on HTTP codes - 400-500,但根据需要进行调整,例如如果你在 ClickHouse 前面有 HTTP 代理,这可能会改变。
      • Maximum Reties - 默认值(10)是合适的,但可以根据需要调整以实现更强大的重试。

5. 测试连接性

在你配置的 HTTP Sink 的主题中创建一条消息


并验证创建的消息已写入你的 ClickHouse 实例。

故障排除

HTTP Sink 没有批处理消息

来自 Sink documentation:

如果包含不同的 Kafka 头值,HTTP Sink connector 不会批处理请求。

  1. 验证你的 Kafka 记录是否具有相同的键。
  2. 当你向 HTTP API URL 添加参数时,每个记录可能导致唯一的 URL。因此,当使用额外的 URL 参数时,批处理被禁用。

400 Bad Request

CANNOT_PARSE_QUOTED_STRING

如果 HTTP Sink 在将 JSON 对象插入 String 列时失败,并出现以下消息:

在 URL 中设置 input_format_json_read_objects_as_strings=1 设置,作为编码字符串 SETTINGS%20input_format_json_read_objects_as_strings%3D1

加载 GitHub 数据集(可选)

请注意,示例保留了 GitHub 数据集的 Array 字段。我们假设你在示例中有一个空的 github 主题,并使用 kcat 向 Kafka 插入消息。

1. 准备配置

按照 这些说明 设置与您的安装类型相关的 Connect,并注意独立模式和分布式集群之间的差异。如果使用 Confluent Cloud,分布式设置是相关的。

最重要的参数是 http.api.url。ClickHouse 的 HTTP 接口 要求你将 INSERT 语句作为 URL 中的参数进行编码。这必须包括格式(在本例中为 JSONEachRow)和目标数据库。格式必须与 Kafka 数据一致,该数据将在 HTTP 有效负载中转换为字符串。这些参数必须进行 URL 转义。以下是 GitHub 数据集的示例格式(假设你在本地运行 ClickHouse):

以下附加参数与 ClickHouse 的 HTTP Sink 相关。完整参数列表可以在 这里 找到:

  • request.method - 设置为 POST
  • retry.on.status.codes - 设置为 400-500 以在任何错误代码上重试。根据数据中的预期错误调整。
  • request.body.format - 在大多数情况下,这将是 JSON。
  • auth.type - 如果你使用 ClickHouse 的安全性,设置为 BASIC。目前不支持其他 ClickHouse 兼容的认证机制。
  • ssl.enabled - 如果使用 SSL,则设置为 true。
  • connection.user - ClickHouse 的用户名。
  • connection.password - ClickHouse 的密码。
  • batch.max.size - 在一次批处理中发送的行数。确保设置为适当大的数字。根据 ClickHouse 的 建议,建议使用 1000 作为最小值。
  • tasks.max - HTTP Sink connector 支持运行一个或多个任务。这可以用于提高性能。与批处理大小一起,这是提高性能的主要手段。
  • key.converter - 根据你的键的类型设置。
  • value.converter - 根据主题中的数据类型设置。这些数据不需要模式。这里的格式必须与参数 http.api.url 中指定的 FORMAT 一致。使用 JSON 和 org.apache.kafka.connect.json.JsonConverter 转换器是最简单的。通过转换器 org.apache.kafka.connect.storage.StringConverter 将值视为字符串也是可能的 - 尽管这将要求用户在插入语句中使用函数提取值。如果使用 io.confluent.connect.avro.AvroConverter 转换器,ClickHouse也支持 Avro格式

完整的设置列表,包括如何配置代理、重试和高级 SSL,可以在 这里 找到。

GitHub 示例数据的完整配置文件可以在 这里 找到,假设 Connect 在独立模式下运行,且 Kafka 托管在 Confluent Cloud。

2. 创建 ClickHouse 表

确保已创建该表。下面是使用标准 MergeTree 的最小 GitHub 数据集的示例。

3. 向 Kafka 添加数据

向 Kafka 插入消息。下面我们使用 kcat 插入 10,000 条消息。

简单读取目标表 "Github" 应确认数据已插入。