跳到主要内容
跳到主要内容

ClickHouse Kafka Connect Sink

备注

如果您需要任何帮助,请 在仓库中提交问题 或在 ClickHouse公共Slack 中提出问题。

ClickHouse Kafka Connect Sink 是将数据从 Kafka 主题传递到 ClickHouse 表的 Kafka 连接器。

License

Kafka 连接器 Sink 根据 Apache 2.0 License 分发。

Requirements for the environment

在环境中应该安装版本 v2.7 或更高的 Kafka Connect 框架。

Version compatibility matrix

ClickHouse Kafka Connect versionClickHouse versionKafka ConnectConfluent platform
1.0.0> 23.3> 2.7> 6.1

Main Features

  • 以开箱即用的 exactly-once 语义发布。它由一个新的 ClickHouse 核心功能 KeeperMap 驱动(作为连接器的状态存储使用),并允许最小化的架构。
  • 支持第三方状态存储:当前默认使用内存,但可以使用 KeeperMap(Redis 将很快添加)。
  • 核心集成:由 ClickHouse 构建、维护和支持。
  • 持续针对 ClickHouse Cloud 进行测试。
  • 具有声明架构和无架构的数据插入。
  • 支持 ClickHouse 的所有数据类型。

Installation instructions

Gather your connection details

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。ご利用のケースに適したユーザー名を使用してください。

ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurlコマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。

General Installation Instructions

连接器分发为一个包含所有必需类文件的单个 JAR 文件。

要安装插件,请按照以下步骤操作:

  • 从 ClickHouse Kafka Connect Sink 仓库的 Releases 页面下载包含连接器 JAR 文件的 ZIP 压缩包。
  • 解压 ZIP 文件内容并复制到所需位置。
  • 在您的 Connect 属性文件中将插件目录的路径添加到 plugin.path 配置中,以允许 Confluent Platform 找到插件。
  • 在配置中提供主题名称、ClickHouse 实例主机名和密码。
  • 重启 Confluent Platform。
  • 如果您使用 Confluent Platform,请登录 Confluent Control Center UI 验证 ClickHouse Sink 是否在可用连接器列表中。

Configuration options

要将 ClickHouse Sink 连接到 ClickHouse 服务器,您需要提供:

  • 连接详细信息:主机名 (必填) 和端口(可选)
  • 用户凭据:密码 (必填) 和用户名(可选)
  • 连接器类:com.clickhouse.kafka.connect.ClickHouseSinkConnector (必填)
  • topics 或 topics.regex:要轮询的 Kafka 主题 - 主题名称必须与表名匹配 (必填)
  • 键和值转换器:根据主题上的数据类型进行设置。如果在工作配置中未定义,则需要。

完整的配置选项表:

Property NameDescriptionDefault Value
hostname (Required)服务器的主机名或 IP 地址N/A
portClickHouse 端口 - 默认是 8443(用于云中的 HTTPS),但对于自托管的 HTTP 默认应为 81238443
ssl启用与 ClickHouse 的 SSL 连接true
jdbcConnectionProperties连接到 Clickhouse 时的连接属性。必须以 ? 开头,在 param=value 之间用 & 连接""
usernameClickHouse 数据库用户名default
password (Required)ClickHouse 数据库密码N/A
databaseClickHouse 数据库名称default
connector.class (Required)连接器类(显式设置并保持为默认值)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.max连接器任务的数量"1"
errors.retry.timeoutClickHouse JDBC 重试超时"60"
exactlyOnce启用 exactly Once"false"
topics (Required)要轮询的 Kafka 主题 - 主题名称必须与表名匹配""
key.converter (Required* - See Description)根据键的类型进行设置。如果您传递键(并且在工作配置中未定义),这里是必需的。"org.apache.kafka.connect.storage.StringConverter"
value.converter (Required* - See Description)根据主题上数据的类型进行设置。支持:- JSON、String、Avro 或 Protobuf 格式。如果在工作配置中未定义,这里是必需的。"org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable连接器值转换器架构支持"false"
errors.tolerance连接器错误容忍度。支持:none、all"none"
errors.deadletterqueue.topic.name如果设置(与 errors.tolerance=all 一起),将为失败的批次使用 DLQ(请参见 Troubleshooting""
errors.deadletterqueue.context.headers.enable为 DLQ 添加额外的头部""
clickhouseSettingsClickHouse 设置的以逗号分隔的列表(例如 "insert_quorum=2, 等等...")""
topic2TableMap映射主题名称到表名称的以逗号分隔的列表(例如 "topic1=table1, topic2=table2, 等等...")""
tableRefreshInterval刷新表定义缓存的时间(以秒为单位)0
keeperOnCluster允许为自托管实例配置 ON CLUSTER 参数(例如 ON CLUSTER clusterNameInConfigFileDefinition)以获得 exactly-once 的 connect_state 表(请参见 Distributed DDL Queries""
bypassRowBinary允许禁用 RowBinary 和 RowBinaryWithDefaults 用于基于模式的数据(Avro、Protobuf 等) - 仅在数据将有缺失列时使用,如果 Nullable/Default 不可接受。"false"
dateTimeFormats用于解析 DateTime64 模式字段的日期时间格式,以 ; 分隔(例如 someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。""
tolerateStateMismatch允许连接器丢弃在 AFTER_PROCESSING 中存储的当前偏移量之前的记录(例如,如果发送偏移量 5,而最后记录的偏移量是 250)"false"
ignorePartitionsWhenBatching在收集用于插入的消息时将忽略分区(但仅当 exactlyOncefalse)。性能注意:连接器任务越多,每个任务分配到的 Kafka 分区就越少 - 这可能意味着收益递减。"false"

Target Tables

ClickHouse Connect Sink 从 Kafka 主题读取消息并将它们写入相应的表中。ClickHouse Connect Sink 将数据写入现有表中。在开始插入数据之前,请确保在 ClickHouse 中创建了具有适当架构的目标表。

每个主题都需要一个专用的目标表。目标表名称必须与源主题名称匹配。

Pre-processing

如果您需要在将出站消息发送到 ClickHouse Kafka Connect Sink 之前转换它们,请使用 Kafka Connect Transformations

Supported Data types

声明架构:

Kafka Connect TypeClickHouse TypeSupportedPrimitive
STRINGStringYes
INT8Int8Yes
INT16Int16Yes
INT32Int32Yes
INT64Int64Yes
FLOAT32Float32Yes
FLOAT64Float64Yes
BOOLEANBooleanYes
ARRAYArray(T)No
MAPMap(Primitive, T)No
STRUCTVariant(T1, T2, ...)No
STRUCTTuple(a T1, b T2, ...)No
STRUCTNested(a T1, b T2, ...)No
BYTESStringNo
org.apache.kafka.connect.data.TimeInt64 / DateTime64No
org.apache.kafka.connect.data.TimestampInt32 / Date32No
org.apache.kafka.connect.data.DecimalDecimalNo

未声明架构:

记录被转换为 JSON 并以值的形式发送到 ClickHouse,格式为 JSONEachRow

Configuration Recipes

以下是一些常见的配置食谱,以帮助您快速入门。

Basic Configuration

最基本的配置,以帮助您开始 - 假设您正在以分布式模式运行 Kafka Connect,并且在 localhost:8443 上运行了启用 SSL 的 ClickHouse 服务器,数据是无架构 JSON。

Basic Configuration with Multiple Topics

连接器可以从多个主题消费数据

Basic Configuration with DLQ

Using with different data formats

Avro Schema Support
Protobuf Schema Support

请注意:如果您遇到缺少类的问题,并不是每个环境都包含 protobuf 转换器,您可能需要捆绑依赖项的 jar 的替代版本。

JSON Schema Support
String Support

连接器支持以不同 ClickHouse 格式的 String Converter:JSONCSVTSV

Logging

日志记录由 Kafka Connect Platform 自动提供。 日志目标和格式可以通过 Kafka connect 配置文件进行配置。

如果使用 Confluent Platform,可以通过运行 CLI 命令查看日志:

有关更多详细信息,请查看官方 tutorial

Monitoring

ClickHouse Kafka Connect 通过 Java Management Extensions (JMX) 报告运行时指标。默认情况下,在 Kafka Connector 中启用 JMX。

ClickHouse Connect MBeanName

ClickHouse Kafka Connect 报告以下指标:

NameTypeDescription
receivedRecordslong接收到的记录总数。
recordProcessingTimelong花费在将记录分组并转换为统一结构的总时间(以纳秒为单位)。
taskProcessingTimelong花费在处理和插入数据到 ClickHouse 的总时间(以纳秒为单位)。

Limitations

  • 不支持删除。
  • 批量大小继承自 Kafka 消费者属性。
  • 当使用 KeeperMap 进行 exactly-once 时,如果偏移量发生更改或回卷,您需要删除该特定主题在 KeeperMap 中的内容。(有关更多详细信息,请参见故障排除指南)

Tuning Performance

如果您曾经想过“我想调整 sink 连接器的批量大小”,那么您可以参考这一部分。

Connect Fetch vs Connector Poll

Kafka Connect(我们的 sink 连接器构建的框架)将在后台从 Kafka 主题中提取消息(与连接器无关)。

您可以使用 fetch.min.bytesfetch.max.bytes 控制此过程 - fetch.min.bytes 设置传递给连接器之前所需的最小值(最多设置的时间限制由 fetch.max.wait.ms),fetch.max.bytes 设置上限。如果您想将更大的批量传递给连接器,可以选择增加最小提取或最大等待时间以构建更大的数据包。

提取的数据随后由连接器客户端轮询以获取消息,每次轮询的数量由 max.poll.records 控制 - 请注意,提取与轮询是独立的!

在调整这些设置时,用户应致力于使他们的提取大小生成多个 max.poll.records 批量(并记住,设置 fetch.min.bytesfetch.max.bytes 代表压缩数据) - 这样,每个连接器任务就可以插入尽可能大的批量。

ClickHouse 针对更大的批量进行了优化,甚至在稍微延迟的情况下,也比频繁的小批量更好 - 批量越大,效果越好。

更多详细信息可以在 Confluent documentationKafka documentation 中找到。

Multiple high throughput topics

如果您的连接器配置为订阅多个主题,您正在使用 topic2TableMap 将主题映射到表,并且在插入时遇到瓶颈导致消费者延迟,则考虑为每个主题创建一个连接器。这主要是由于批量当前会被串行插入到每个表中造成的 serially

为每个主题创建一个连接器是一种解决方法,可以确保您获得最快的插入率。

Troubleshooting

"State mismatch for topic [someTopic] partition [0]"

当 KeeperMap 中存储的偏移量与 Kafka 中存储的偏移量不同(通常是主题被删除或手动调整偏移量时),会发生这种情况。要修复此问题,您需要删除给定主题 + 分区中存储的旧值。

注意:此调整可能会有 exactly-once 的影响。

"What errors will the connector retry?"

目前的焦点是识别瞬态并且可以重试的错误,包括:

  • ClickHouseException - 这是 ClickHouse 抛出的通用异常。 通常在服务器过载时抛出,以下错误代码被认为特别瞬态:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - 当 socket 超时时抛出。
  • UnknownHostException - 当主机无法解析时抛出。
  • IOException - 当网络出现问题时抛出。

"All my data is blank/zeroes"

很可能您的数据中的字段与表中的字段不匹配 - 这在 CDC(和 Debezium 格式)中尤其常见。 一个常见的解决方案是向您的连接器配置中添加扁平化转换:

这将把您的数据从嵌套 JSON 转换为扁平 JSON(使用 _ 作为分隔符)。表中的字段将遵循 "field1_field2_field3" 格式(即 "before_id"、"after_id" 等)。

"I want to use my Kafka keys in ClickHouse"

Kafka 键默认不存储在值字段中,但您可以使用 KeyToValue 转换将键移动到值字段(下划线 _key 字段名称下):