Confluent HTTP Sink 连接器
HTTP Sink 连接器与数据类型无关,因此不需要Kafka模式,并且支持 ClickHouse 特有的数据类型,如 Maps 和 Arrays。这种额外的灵活性导致配置复杂性稍微增加。
下面我们描述了一个简单的安装过程,从一个 Kafka 主题提取消息并将行插入到 ClickHouse 表中。
HTTP 连接器基于 Confluent 企业许可证 分发。
快速开始步骤
1. 收集连接详细信息
要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:
-
主机和端口:通常,当使用 TLS 时端口为 8443,当不使用 TLS 时端口为 8123。
-
数据库名称:开箱即用时,有一个名为
default
的数据库,请使用您要连接的数据库名称。 -
用户名和密码:开箱即用时,用户名为
default
。请使用适合您用例的用户名。
您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中获得。 选择您要连接的服务并点击 连接:

选择 HTTPS,详细信息可在示例的 curl
命令中获得。

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。
2. 运行 Kafka Connect 和 HTTP Sink 连接器
您有两个选项:
-
自管理: 下载 Confluent 包并在本地安装。按照 此处 记录的安装说明安装连接器。 如果您使用 confluent-hub 安装方法,您的本地配置文件将会更新。
-
Confluent Cloud: 对于使用 Confluent Cloud 进行 Kafka 托管的用户,提供 HTTP Sink 的完全托管版本。这要求您的 ClickHouse 环境可以从 Confluent Cloud 访问。
以下示例使用 Confluent Cloud。
3. 在 ClickHouse 中创建目标表
在连接性测试之前,首先在 ClickHouse Cloud 中创建一个测试表,该表将接收来自 Kafka 的数据:
4. 配置 HTTP Sink
创建一个 Kafka 主题和一个 HTTP Sink 连接器实例:

配置 HTTP Sink 连接器:
- 提供您创建的主题名称
- 身份验证
HTTP Url
- ClickHouse Cloud URL,指定INSERT
查询<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow
。 注意:查询必须被编码。Endpoint Authentication type
- BASICAuth username
- ClickHouse 用户名Auth password
- ClickHouse 密码
这个 HTTP Url 很容易出错。确保转义准确,以避免出现问题。

- 配置
Input Kafka record value format
取决于您的源数据,但在大多数情况下为 JSON 或 Avro。我们假设在以下设置中为JSON
。- 在
advanced configurations
部分:HTTP Request Method
- 设置为 POSTRequest Body Format
- jsonBatch batch size
- 根据 ClickHouse 的建议,将此设置为 至少 1000。Batch json as array
- trueRetry on HTTP codes
- 400-500,但根据需要进行调整,例如,如果您在 ClickHouse 前面有 HTTP 代理,这可能会有所更改。Maximum Reties
- 默认值(10)是合适的,但可以根据需要进行调整以实现更强大的重试。

5. 测试连接性
在由您的 HTTP Sink 配置的主题中创建一条消息

并验证已创建的消息是否已写入您的 ClickHouse 实例。
故障排除
HTTP Sink 不会对消息进行批处理
来自 Sink 文档:
HTTP Sink 连接器不会对包含不同 Kafka 头值的消息进行批处理。
- 验证您的 Kafka 记录是否具有相同的键。
- 当您向 HTTP API URL 添加参数时,每条记录可能导致一个唯一的 URL。因此,当使用附加 URL 参数时,禁用批处理。
400 错误请求
CANNOT_PARSE_QUOTED_STRING
如果在将 JSON 对象插入到 String
列时,HTTP Sink 失败并出现以下消息:
在 URL 中设置 input_format_json_read_objects_as_strings=1
设置,作为编码字符串 SETTINGS%20input_format_json_read_objects_as_strings%3D1
加载 GitHub 数据集(可选)
请注意,此示例保留 Github 数据集的 Array 字段。我们假设您在示例中有一个空的 github 主题,并使用 kcat 将消息插入到 Kafka。
1. 准备配置
请按照 这些说明 设置与您的安装类型相关的 Connect,注意独立和分布式集群之间的差异。如果使用 Confluent Cloud,则相关的是分布式设置。
最重要的参数是 http.api.url
。ClickHouse 的 HTTP 接口 要求您将 INSERT 语句作为 URL 中的参数进行编码。这必须包括格式(在此案例中为 JSONEachRow
)和目标数据库。格式必须与 Kafka 数据一致,该数据将在 HTTP 负载中转换为字符串。这些参数必须进行 URL 转义。以下是 Github 数据集的这种格式示例(假设您在本地运行 ClickHouse):
使用 HTTP Sink 访问 ClickHouse 相关的其他参数如下。完整参数列表可以在 此处 找到:
request.method
- 设置为 POSTretry.on.status.codes
- 设置为 400-500 以在出现任何错误代码时重试。根据数据中预期的错误进行调整。request.body.format
- 在大多数情况下,这将是 JSON。auth.type
- 如果与 ClickHouse 一起使用安全性,则设置为 BASIC。目前不支持其他与 ClickHouse 兼容的身份验证机制。ssl.enabled
- 如果使用 SSL,则设置为 true。connection.user
- ClickHouse 的用户名。connection.password
- ClickHouse 的密码。batch.max.size
- 单个批次中要发送的行数。确保将此设置为适当大的数字。根据 ClickHouse 的 建议,应考虑设置为 1000。tasks.max
- HTTP Sink 连接器支持运行一个或多个任务。这可以用于提高性能。结合批量大小,这代表您改善性能的主要手段。key.converter
- 根据键的类型进行设置。value.converter
- 根据主题上的数据类型进行设置。这些数据不需要模式。这里的格式必须与参数http.api.url
中指定的 FORMAT 一致。这里最简单的是使用 JSON 和 org.apache.kafka.connect.json.JsonConverter 转换器。通过转换器 org.apache.kafka.connect.storage.StringConverter 将值视为字符串也是可能的 - 尽管这将要求用户在插入语句中使用函数提取值。如果使用 io.confluent.connect.avro.AvroConverter 转换器,ClickHouse 也支持 Avro 格式。
完整的设置列表,包括如何配置代理、重试和高级 SSL,可以在 此处 找到。
Github 示例数据的示例配置文件可以在 此处 找到,假设 Connect 在独立模式下运行,Kafka 托管在 Confluent Cloud 中。
2. 创建 ClickHouse 表
确保表已创建。下面是一个使用标准 MergeTree 的最小 github 数据集例子。
3. 将数据添加到 Kafka
将消息插入到 Kafka。下面我们使用 kcat 插入 10k 条消息。
对目标表 "Github" 进行简单读取,应该确认数据的插入。