JDBC 连接器
此连接器仅在您的数据简单且由基本数据类型(例如,int)组成时使用。ClickHouse 特定类型,如映射,不支持。
在我们的示例中,我们利用了 Confluent 版本的 Kafka Connect。
下面我们描述了一个简单的安装过程,从单个 Kafka 主题中提取消息并将行插入到 ClickHouse 表中。我们推荐使用 Confluent Cloud,该服务为没有 Kafka 环境的用户提供慷慨的免费套餐。
请注意,JDBC 连接器需要一个模式(您不能使用普通的 JSON 或 CSV)。虽然模式可以在每个消息中编码,但强烈建议使用 Confluent 模式注册表以避免相关开销。提供的插入脚本会自动推断消息的模式并将其插入到注册表中,因此此脚本可以重用于其他数据集。Kafka 的键被假定为字符串。有关 Kafka 模式的更多详细信息,请参见 此处。
许可证
JDBC 连接器根据 Confluent Community License 分发。
步骤
收集您的连接详细信息
要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:
-
HOST 和 PORT:通常,当使用 TLS 时,端口为 8443;当不使用 TLS 时,端口为 8123。
-
数据库名称:开箱即用时,有一个名为
default
的数据库,请使用您要连接的数据库名称。 -
用户名和密码:开箱即用时,用户名为
default
。请使用适合您用例的用户名。
您的 ClickHouse Cloud 服务详细信息可在 ClickHouse Cloud 控制台中找到。 选择您要连接的服务并点击 Connect:

选择 HTTPS,详细信息会在示例 curl
命令中提供。

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。
1. 安装 Kafka Connect 和连接器
我们假设您已下载 Confluent 包并将其安装在本地。按照 这里 文档中的安装说明安装连接器。
如果您使用 confluent-hub 安装方法,本地配置文件将被更新。
为了从 Kafka 向 ClickHouse 发送数据,我们使用连接器的接收器组件。
2. 下载并安装 JDBC 驱动程序
从 这里 下载并安装 ClickHouse JDBC 驱动程序 clickhouse-jdbc-<version>-shaded.jar
。按照 这里 的详细信息将其安装到 Kafka Connect 中。其他驱动程序可能有效,但尚未经过测试。
常见问题:文档建议将 jar 复制到 share/java/kafka-connect-jdbc/
。如果您在 Connect 中遇到找不到驱动程序的问题,请将驱动程序复制到 share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
。或者修改 plugin.path
以包括驱动程序 - 详见下文。
3. 准备配置
按照 这些说明 设置与您的安装类型相关的 Connect,注意独立和分布式集群之间的差异。如果使用 Confluent Cloud,则相关于分布式设置。
以下参数与将 JDBC 连接器与 ClickHouse 一起使用相关。完整参数列表可在 这里 查找:
_connection.url_
- 其形式应为jdbc:clickhouse://<clickhouse host>:<clickhouse http port>/<target database>
connection.user
- 对目标数据库具有写入权限的用户table.name.format
- ClickHouse 表以插入数据。这个表必须存在。batch.size
- 单个批次中要发送的行数。确保此项设置为适当大的数字。根据 ClickHouse 的 建议,值 1000 应视为最小值。tasks.max
- JDBC Sink 连接器支持运行一个或多个任务。可以用来提高性能。结合批量大小,这是提高性能的主要手段。value.converter.schemas.enable
- 如果使用模式注册表,设置为 false;如果您在消息中嵌入模式,则设置为 true。value.converter
- 根据您的数据类型进行设置,例如对于 JSON,使用io.confluent.connect.json.JsonSchemaConverter
。key.converter
- 设置为org.apache.kafka.connect.storage.StringConverter
。我们使用字符串键。pk.mode
- 对 ClickHouse 不相关。设置为 none。auto.create
- 不受支持,必须为 false。auto.evolve
- 我们建议此设置为 false,尽管可能在未来支持。insert.mode
- 设置为 "insert"。当前不支持其他模式。key.converter
- 根据您的键类型进行设置。value.converter
- 根据您主题中的数据类型进行设置。此数据必须具有受支持的模式 - JSON、Avro 或 Protobuf 格式。
如果使用我们的示例数据集进行测试,请确保设置以下内容:
value.converter.schemas.enable
- 设置为 false,因为我们使用模式注册表。如果您在每条消息中嵌入模式,则设置为 true。key.converter
- 设置为 "org.apache.kafka.connect.storage.StringConverter"。我们使用字符串键。value.converter
- 设置为 "io.confluent.connect.json.JsonSchemaConverter"。value.converter.schema.registry.url
- 设置为模式服务器的 URL 及模式服务器的凭据,通过参数value.converter.schema.registry.basic.auth.user.info
传递。
关于 Github 样本数据的示例配置文件,可以在 这里 找到,假设 Connect 以独立模式运行,并且 Kafka 托管在 Confluent Cloud 中。
4. 创建 ClickHouse 表
确保表已经创建,如果之前存在,请将其删除。下面是与缩小的 Github 数据集兼容的一个示例。注意缺少当前不支持的任何 Array 或 Map 类型:
5. 启动 Kafka Connect
在 独立 或 分布式 模式下启动 Kafka Connect。
6. 向 Kafka 添加数据
使用提供的 脚本和配置 向 Kafka 插入消息。您需要修改 github.config 以包含您的 Kafka 凭据。该脚本目前配置为与 Confluent Cloud 一起使用。
该脚本可用于将任何 ndjson 文件插入到 Kafka 主题。它将尝试为您自动推断模式。提供的示例配置将仅插入 10k 消息 - 如果需要,请 在此处修改。此配置在将数据插入 Kafka 期间还会移除数据集中任何不兼容的 Array 字段。
这是 JDBC 连接器将消息转换为 INSERT 语句所必需的。如果您使用自己的数据,请确保在每条消息中插入模式(将 _value.converter.schemas.enable _ 设置为 true)或确保您的客户端发布的消息引用注册表中的模式。
Kafka Connect 应开始消费消息并将行插入到 ClickHouse。请注意,有关 "[JDBC 兼容模式] 不支持事务。" 的警告是预期的,可以忽略。
对目标表 "Github" 的简单读取应确认数据插入。