跳到主要内容
跳到主要内容

JDBC 连接器

备注

仅当您的数据简单并且由基本数据类型组成(例如:int)时,才应使用此连接器。ClickHouse 特定类型,例如 maps,不受支持。

在我们的示例中,我们使用 Confluent 分发版的 Kafka Connect。

下面我们描述了一种简单的安装方法,从单个 Kafka 主题获取消息并将行插入 ClickHouse 表中。我们推荐使用 Confluent Cloud,它为没有 Kafka 环境的用户提供了慷慨的免费套餐。

请注意,JDBC 连接器需要架构(您不能使用普通的 JSON 或 CSV 与 JDBC 连接器)。虽然可以在每条消息中编码架构,但强烈建议使用 Confluent 架构注册表 这里y 以避免相关开销。提供的插入脚本会自动从消息中推断出架构并将其插入注册表 - 因此,该脚本可以重用于其他数据集。Kafka 的键假定为字符串。有关 Kafka 架构的更多详细信息,请参见 这里

许可证

JDBC 连接器根据 Confluent Community License 分发。

步骤

收集连接详细信息

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

  • HOST 和 PORT:通常情况下,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。

  • 数据库名称:默认情况下,有一个名为 default 的数据库,请使用您要连接的数据库名称。

  • 用户名和密码:默认情况下,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中找到。选择您要连接的服务并点击 Connect

选择 HTTPS,并且详细信息可以在一个示例 curl 命令中找到。

如果您使用自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

1. 安装 Kafka Connect 和连接器

我们假设您已下载了 Confluent 包并在本地安装。请按照 这里 中记录的安装说明安装连接器。

如果您使用 confluent-hub 安装方法,您的本地配置文件将被更新。

为了从 Kafka 向 ClickHouse 发送数据,我们使用连接器的 Sink 组件。

2. 下载和安装 JDBC 驱动程序

这里 下载并安装 ClickHouse JDBC 驱动程序 clickhouse-jdbc-<version>-shaded.jar。根据 这里 的信息将其安装到 Kafka Connect 中。其他驱动程序可能有效,但未经过测试。

备注

常见问题:文档建议将 jar 复制到 share/java/kafka-connect-jdbc/。如果您在 Connect 找到驱动程序时遇到问题,请将驱动程序复制到 share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/。或者修改 plugin.path 以包含驱动程序 - 见下文。

3. 准备配置

按照 这些说明 配置与您的安装类型相关的 Connect,注意独立和分布式集群之间的差异。如果使用 Confluent Cloud,分布式设置是相关的。

以下参数与使用 JDBC 连接器连接 ClickHouse 相关。完整参数列表可以在 这里 找到:

  • _connection.url_ - 形式应为 jdbc:clickhouse://&lt;clickhouse host>:&lt;clickhouse http port>/&lt;target database>
  • connection.user - 具有目标数据库写入权限的用户
  • table.name.format- ClickHouse 表以插入数据。此表必须存在。
  • batch.size - 一次批量发送的行数。确保设置为适当大的数字。根据 ClickHouse 的 建议,值为 1000 应视为最低值。
  • tasks.max - JDBC Sink 连接器支持运行一个或多个任务。这可以用来提高性能。连同批量大小,这表示您改善性能的主要手段。
  • value.converter.schemas.enable - 如果使用架构注册表,设置为 false;如果您将架构嵌入到消息中,设置为 true。
  • value.converter - 根据您的数据类型进行设置,例如对于 JSON,io.confluent.connect.json.JsonSchemaConverter
  • key.converter - 设置为 org.apache.kafka.connect.storage.StringConverter。我们使用字符串键。
  • pk.mode - 与 ClickHouse 无关。设置为 none。
  • auto.create - 不支持,必须设置为 false。
  • auto.evolve - 我们建议将此设置为 false,尽管将来可能会受到支持。
  • insert.mode - 设置为 "insert"。其他模式当前不受支持。
  • key.converter - 根据您的键类型进行设置。
  • value.converter - 根据主题中的数据类型进行设置。该数据必须有支持的架构 - JSON,Avro 或 Protobuf 格式。

如果使用我们的示例数据集进行测试,请确保以下设置:

  • value.converter.schemas.enable - 设置为 false,因为我们使用架构注册表。如果您将架构嵌入到每条消息中,则设置为 true。
  • key.converter - 设置为 "org.apache.kafka.connect.storage.StringConverter"。我们使用字符串键。
  • value.converter - 设置为 "io.confluent.connect.json.JsonSchemaConverter"。
  • value.converter.schema.registry.url - 设置为架构服务器的 URL,并通过参数 value.converter.schema.registry.basic.auth.user.info 提供架构服务器的凭据。

可以在 这里 找到 Github 示例数据的示例配置文件,假设 Connect 以独立模式运行,Kafka 托管在 Confluent Cloud 中。

4. 创建 ClickHouse 表

确保表已创建,如果之前的示例中已经存在,则删除该表。下面是兼容减少的 Github 数据集的示例。请注意缺少当前不支持的任何 Array 或 Map 类型:

5. 启动 Kafka Connect

独立分布式 模式启动 Kafka Connect。

6. 向 Kafka 添加数据

使用提供的 脚本和配置 向 Kafka 插入消息。您需要修改 github.config 以包含您的 Kafka 凭据。该脚本当前配置为与 Confluent Cloud 一起使用。

该脚本可用于将任何 ndjson 文件插入 Kafka 主题。这将尝试为您自动推断架构。提供的示例配置将只插入 1 万条消息 - 如有需要,可以 在此修改。该配置还在插入数据到 Kafka 时从数据集中删除任何不兼容的 Array 字段。

这是 JDBC 连接器将消息转换为 INSERT 语句所必需的。如果您使用自己的数据,请确保您要么在每条消息中插入架构(将 _value.converter.schemas.enable _ 设置为 true),要么确保您的客户端发布引用到注册表的架构的消息。

Kafka Connect 应该开始消费消息并将行插入 ClickHouse。请注意关于 "[JDBC 兼容模式] 不支持事务。" 的警告是预期的,可以忽略。

对目标表 "Github" 的简单读取应该确认数据插入。