跳到主要内容
跳到主要内容

JDBC 连接器

备注

此连接器仅在您的数据简单且由基本数据类型(例如,int)组成时使用。ClickHouse 特定类型,如映射,不支持。

在我们的示例中,我们利用了 Confluent 版本的 Kafka Connect。

下面我们描述了一个简单的安装过程,从单个 Kafka 主题中提取消息并将行插入到 ClickHouse 表中。我们推荐使用 Confluent Cloud,该服务为没有 Kafka 环境的用户提供慷慨的免费套餐。

请注意,JDBC 连接器需要一个模式(您不能使用普通的 JSON 或 CSV)。虽然模式可以在每个消息中编码,但强烈建议使用 Confluent 模式注册表以避免相关开销。提供的插入脚本会自动推断消息的模式并将其插入到注册表中,因此此脚本可以重用于其他数据集。Kafka 的键被假定为字符串。有关 Kafka 模式的更多详细信息,请参见 此处

许可证

JDBC 连接器根据 Confluent Community License 分发。

步骤

收集您的连接详细信息

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

  • HOST 和 PORT:通常,当使用 TLS 时,端口为 8443;当不使用 TLS 时,端口为 8123。

  • 数据库名称:开箱即用时,有一个名为 default 的数据库,请使用您要连接的数据库名称。

  • 用户名和密码:开箱即用时,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务详细信息可在 ClickHouse Cloud 控制台中找到。 选择您要连接的服务并点击 Connect

ClickHouse Cloud 服务连接按钮

选择 HTTPS,详细信息会在示例 curl 命令中提供。

ClickHouse Cloud HTTPS 连接详细信息

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

1. 安装 Kafka Connect 和连接器

我们假设您已下载 Confluent 包并将其安装在本地。按照 这里 文档中的安装说明安装连接器。

如果您使用 confluent-hub 安装方法,本地配置文件将被更新。

为了从 Kafka 向 ClickHouse 发送数据,我们使用连接器的接收器组件。

2. 下载并安装 JDBC 驱动程序

这里 下载并安装 ClickHouse JDBC 驱动程序 clickhouse-jdbc-<version>-shaded.jar。按照 这里 的详细信息将其安装到 Kafka Connect 中。其他驱动程序可能有效,但尚未经过测试。

备注

常见问题:文档建议将 jar 复制到 share/java/kafka-connect-jdbc/。如果您在 Connect 中遇到找不到驱动程序的问题,请将驱动程序复制到 share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/。或者修改 plugin.path 以包括驱动程序 - 详见下文。

3. 准备配置

按照 这些说明 设置与您的安装类型相关的 Connect,注意独立和分布式集群之间的差异。如果使用 Confluent Cloud,则相关于分布式设置。

以下参数与将 JDBC 连接器与 ClickHouse 一起使用相关。完整参数列表可在 这里 查找:

  • _connection.url_ - 其形式应为 jdbc:clickhouse://&lt;clickhouse host>:&lt;clickhouse http port>/&lt;target database>
  • connection.user - 对目标数据库具有写入权限的用户
  • table.name.format - ClickHouse 表以插入数据。这个表必须存在。
  • batch.size - 单个批次中要发送的行数。确保此项设置为适当大的数字。根据 ClickHouse 的 建议,值 1000 应视为最小值。
  • tasks.max - JDBC Sink 连接器支持运行一个或多个任务。可以用来提高性能。结合批量大小,这是提高性能的主要手段。
  • value.converter.schemas.enable - 如果使用模式注册表,设置为 false;如果您在消息中嵌入模式,则设置为 true。
  • value.converter - 根据您的数据类型进行设置,例如对于 JSON,使用 io.confluent.connect.json.JsonSchemaConverter
  • key.converter - 设置为 org.apache.kafka.connect.storage.StringConverter。我们使用字符串键。
  • pk.mode - 对 ClickHouse 不相关。设置为 none。
  • auto.create - 不受支持,必须为 false。
  • auto.evolve - 我们建议此设置为 false,尽管可能在未来支持。
  • insert.mode - 设置为 "insert"。当前不支持其他模式。
  • key.converter - 根据您的键类型进行设置。
  • value.converter - 根据您主题中的数据类型进行设置。此数据必须具有受支持的模式 - JSON、Avro 或 Protobuf 格式。

如果使用我们的示例数据集进行测试,请确保设置以下内容:

  • value.converter.schemas.enable - 设置为 false,因为我们使用模式注册表。如果您在每条消息中嵌入模式,则设置为 true。
  • key.converter - 设置为 "org.apache.kafka.connect.storage.StringConverter"。我们使用字符串键。
  • value.converter - 设置为 "io.confluent.connect.json.JsonSchemaConverter"。
  • value.converter.schema.registry.url - 设置为模式服务器的 URL 及模式服务器的凭据,通过参数 value.converter.schema.registry.basic.auth.user.info 传递。

关于 Github 样本数据的示例配置文件,可以在 这里 找到,假设 Connect 以独立模式运行,并且 Kafka 托管在 Confluent Cloud 中。

4. 创建 ClickHouse 表

确保表已经创建,如果之前存在,请将其删除。下面是与缩小的 Github 数据集兼容的一个示例。注意缺少当前不支持的任何 Array 或 Map 类型:

5. 启动 Kafka Connect

独立分布式 模式下启动 Kafka Connect。

6. 向 Kafka 添加数据

使用提供的 脚本和配置 向 Kafka 插入消息。您需要修改 github.config 以包含您的 Kafka 凭据。该脚本目前配置为与 Confluent Cloud 一起使用。

该脚本可用于将任何 ndjson 文件插入到 Kafka 主题。它将尝试为您自动推断模式。提供的示例配置将仅插入 10k 消息 - 如果需要,请 在此处修改。此配置在将数据插入 Kafka 期间还会移除数据集中任何不兼容的 Array 字段。

这是 JDBC 连接器将消息转换为 INSERT 语句所必需的。如果您使用自己的数据,请确保在每条消息中插入模式(将 _value.converter.schemas.enable _ 设置为 true)或确保您的客户端发布的消息引用注册表中的模式。

Kafka Connect 应开始消费消息并将行插入到 ClickHouse。请注意,有关 "[JDBC 兼容模式] 不支持事务。" 的警告是预期的,可以忽略。

对目标表 "Github" 的简单读取应确认数据插入。