跳到主要内容
跳到主要内容

Confluent HTTP Sink Connector

HTTP Sink Connector 与数据类型无关,因此不需要 Kafka 模式,同时支持 ClickHouse 特有的数据类型,如 Maps 和 Arrays。这种额外的灵活性使得配置复杂度略有增加。

下面我们描述了一个简单的安装过程,从单个 Kafka 主题提取消息并将行插入到 ClickHouse 表中。

备注

HTTP Connector 在 Confluent 企业许可证 下分发。

快速启动步骤

1. 收集连接详情

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

  • HOST 和 PORT:通常,当使用 TLS 时,端口为 8443;当不使用 TLS 时,端口为 8123。

  • 数据库名称:开箱即用时,有一个名为 default 的数据库,请使用您要连接的数据库名称。

  • 用户名和密码:开箱即用时,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务详细信息可在 ClickHouse Cloud 控制台中找到。 选择您要连接的服务并点击 Connect

ClickHouse Cloud 服务连接按钮

选择 HTTPS,详细信息会在示例 curl 命令中提供。

ClickHouse Cloud HTTPS 连接详细信息

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

2. 运行 Kafka Connect 和 HTTP Sink Connector

您有两个选择:

  • 自管理: 下载 Confluent 包并在本地安装。请按照文档 这里 中的安装说明安装连接器。 如果您使用 confluent-hub 安装方法,会更新您的本地配置文件。

  • Confluent Cloud: 对于使用 Confluent Cloud 托管 Kafka 的用户,提供了完全托管的 HTTP Sink 版本。这要求您的 ClickHouse 环境可以从 Confluent Cloud 访问。

备注

以下示例使用的是 Confluent Cloud。

3. 在 ClickHouse 中创建目标表

在进行连接测试之前,让我们先在 ClickHouse Cloud 中创建一个测试表,该表将接收来自 Kafka 的数据:

4. 配置 HTTP Sink

创建 Kafka 主题和 HTTP Sink Connector 实例:

Create HTTP Sink

配置 HTTP Sink Connector:

  • 提供您创建的主题名称
  • 认证
    • HTTP Url - ClickHouse Cloud URL,指定 INSERT 查询 <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow注意:查询必须经过编码。
    • Endpoint Authentication type - BASIC
    • Auth username - ClickHouse 用户名
    • Auth password - ClickHouse 密码
备注

这个 HTTP Url 容易出错。确保转义准确,以免出现问题。

Auth options for Confluent HTTP Sink
  • 配置
    • Input Kafka record value format 取决于您的源数据,但在大多数情况下为 JSON 或 Avro。我们在以下设置中假设为 JSON
    • advanced configurations 部分:
      • HTTP Request Method - 设置为 POST
      • Request Body Format - json
      • Batch batch size - 根据 ClickHouse 的建议,设置为 至少 1000
      • Batch json as array - true
      • Retry on HTTP codes - 400-500,但根据需要进行调整,例如,如果您在 ClickHouse 前面有 HTTP 代理,可能会改变。
      • Maximum Reties - 默认值(10)是合适的,但可以根据需要进行调整。
Advanced options for Confluent HTTP Sink

5. 测试连接性

在由您的 HTTP Sink 配置的主题中创建一条消息

Create a message in the topic

并确认创建的消息已写入您的 ClickHouse 实例。

故障排除

HTTP Sink 不批量处理消息

来自 Sink 文档:

HTTP Sink Connector 不会对包含不同 Kafka 头值的消息进行批量请求。

  1. 验证您的 Kafka 记录是否具有相同的键。
  2. 当您向 HTTP API URL 添加参数时,每条记录可能会导致唯一的 URL。因此,在使用额外的 URL 参数时,批量处理将被禁用。

400 Bad Request

CANNOT_PARSE_QUOTED_STRING

如果 HTTP Sink 在将 JSON 对象插入到 String 列时失败并显示以下消息:

在 URL 中设置 input_format_json_read_objects_as_strings=1 设置为编码字符串 SETTINGS%20input_format_json_read_objects_as_strings%3D1

加载 GitHub 数据集 (可选)

请注意,此示例保留了 GitHub 数据集中 Array 字段。我们假定您在示例中有一个空的 github 主题,并使用 kcat 将消息插入 Kafka。

1. 准备配置

按照 这些说明 设置与您的安装类型相关的 Connect,注意独立式和分布式集群之间的差异。如果使用 Confluent Cloud,分布式设置是相关的。

最重要的参数是 http.api.url。ClickHouse 的 HTTP 接口 要求您将 INSERT 语句作为 URL 中的参数进行编码。这必须包括格式(在此情况下为 JSONEachRow)和目标数据库。格式必须与将转换为 HTTP 有效负载中的字符串的 Kafka 数据一致。这些参数必须进行 URL 转义。假设您在本地运行 ClickHouse,以下是 GitHub 数据集的格式示例:

与使用 HTTP Sink 连接 ClickHouse 相关的其他参数如下所示。完整的参数列表可以在 这里 找到:

  • request.method - 设置为 POST
  • retry.on.status.codes - 设置为 400-500,以便在出现错误代码时重试。根据数据中 expected errors 进行精炼。
  • request.body.format - 在大多数情况下,这将是 JSON。
  • auth.type - 如果您对 ClickHouse 进行安全处理,请设置为 BASIC。当前不支持其他与 ClickHouse 兼容的身份验证机制。
  • ssl.enabled - 如果使用 SSL,请设置为 true。
  • connection.user - ClickHouse 的用户名。
  • connection.password - ClickHouse 的密码。
  • batch.max.size - 一次批量发送的行数。确保设置为适当的大数字。根据 ClickHouse 的 建议,值为 1000 应视为最低值。
  • tasks.max - HTTP Sink Connector 支持运行一个或多个任务。这可以用于提高性能。连同批量大小,这代表了您提高性能的主要手段。
  • key.converter - 根据您的键类型进行设置。
  • value.converter - 根据您主题上的数据类型进行设置。这些数据不需要模式。这里的格式必须与参数 http.api.url 中指定的格式一致。最简单的是使用 JSON 和 org.apache.kafka.connect.json.JsonConverter 转换器。通过转换器 org.apache.kafka.connect.storage.StringConverter 将值视为字符串也是可能的 - 尽管这将要求用户使用函数在插入语句中提取值。如果使用 io.confluent.connect.avro.AvroConverter 转换器,ClickHouse 也支持 Avro 格式

完整的设置列表,包括如何配置代理、重试和高级 SSL,可以在 这里 找到。

GitHub 示例数据的配置文件可以在 这里 找到,假设 Connect 以独立模式运行,Kafka 托管在 Confluent Cloud。

2. 创建 ClickHouse 表

确保表已创建。下面是使用标准 MergeTree 的最小 github 数据集示例。

3. 将数据添加到 Kafka

将消息插入 Kafka。下面我们使用 kcat 插入 10,000 条消息。

对目标表 "Github" 的简单读取应确认数据的插入。