跳到主要内容
跳到主要内容

ClickHouse Kafka Connect Sink

备注

如果您需要任何帮助,请 在这个仓库中提交问题 或在 ClickHouse 公共 Slack 提出问题。

ClickHouse Kafka Connect Sink 是将数据从 Kafka 主题传递到 ClickHouse 表的 Kafka 连接器。

License

Kafka 连接器 Sink 在 Apache 2.0 License 下分发。

Requirements for the environment

环境中需安装 Kafka Connect 框架 v2.7 或更高版本。

Version compatibility matrix

ClickHouse Kafka Connect versionClickHouse versionKafka ConnectConfluent platform
1.0.0> 23.3> 2.7> 6.1

Main features

  • 具有开箱即用的恰好一次语义。它由一个名为 KeeperMap 的新 ClickHouse 核心特性提供支持(作为连接器的状态存储),并允许最小化架构。
  • 支持第三方状态存储:目前默认为内存存储,但可以使用 KeeperMap(Redis 将很快添加)。
  • 核心集成:由 ClickHouse 构建、维护和支持。
  • 持续对 ClickHouse Cloud 进行测试。
  • 支持声明模式和无模式的数据插入。
  • 支持 ClickHouse 的所有数据类型。

Installation instructions

Gather your connection details

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

  • 主机端口:通常,当使用 TLS 时端口为 8443,当不使用 TLS 时端口为 8123。

  • 数据库名称:开箱即用时,有一个名为 default 的数据库,请使用您要连接的数据库名称。

  • 用户名密码:开箱即用时,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中获得。 选择您要连接的服务并点击 连接

ClickHouse Cloud 服务连接按钮

选择 HTTPS,详细信息可在示例的 curl 命令中获得。

ClickHouse Cloud HTTPS 连接详细信息

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

General installation instructions

该连接器作为一个包含所有运行插件所需类文件的单个 JAR 文件分发。

要安装该插件,请执行以下步骤:

  • 从 ClickHouse Kafka Connect Sink 仓库的 Releases 页面下载包含连接器 JAR 文件的 zip 文件。
  • 解压 ZIP 文件内容并复制到所需位置。
  • 在您的 Connect 属性文件中将插件目录的路径添加到 plugin.path 配置,以允许 Confluent 平台找到插件。
  • 在配置中提供主题名称、ClickHouse 实例主机名和密码。
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
jdbcConnectionProperties=?sslmode=STRICT
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false
  • 重启 Confluent 平台。
  • 如果您使用 Confluent 平台,请登录到 Confluent 控制中心 UI,以验证 ClickHouse Sink 是否在可用连接器列表中。

Configuration options

要将 ClickHouse Sink 连接到 ClickHouse 服务器,您需要提供:

  • 连接详细信息:主机名 (必填) 和端口(可选)
  • 用户凭据:密码 (必填) 和用户名(可选)
  • 连接器类: com.clickhouse.kafka.connect.ClickHouseSinkConnector (必填)
  • topics 或 topics.regex:要轮询的 Kafka 主题 - 主题名称必须与表名称匹配 (必填)
  • key 和 value 转换器:根据主题上的数据类型设置。如果在工作配置中尚未定义,则为必填项。

完整的配置选项表:

属性名描述默认值
hostname (必填)服务器的主机名或 IP 地址N/A
portClickHouse 端口 - 默认是 8443(用于云中的 HTTPS),但对于 HTTP(自托管的默认值)应为 81238443
ssl启用ssl连接到 ClickHousetrue
jdbcConnectionProperties连接到 Clickhouse 的连接属性。必须以 ? 开头并通过 & 连结 param=value""
usernameClickHouse 数据库用户名default
password (必填)ClickHouse 数据库密码N/A
databaseClickHouse 数据库名称default
connector.class (必填)连接器类(显式设置并保持为默认值)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.max连接器任务的数量"1"
errors.retry.timeoutClickHouse JDBC 重试超时"60"
exactlyOnce启用恰好一次"false"
topics (必填)要轮询的 Kafka 主题 - 主题名称必须与表名称匹配""
key.converter (必填* - 见描述)根据键的类型设置。如果您正在传递键(且未在工作配置中定义),此处为必填项。"org.apache.kafka.connect.storage.StringConverter"
value.converter (必填* - 见描述)根据主题上数据的类型设置。支持:- JSON、字符串、Avro 或 Protobuf 格式。如果在工作配置中未定义,则在此处为必填项。"org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable连接器值转换器模式支持"false"
errors.tolerance连接器错误容忍度。支持:none, all"none"
errors.deadletterqueue.topic.name如果设置(与 errors.tolerance=all 一起使用),将为失败的批次使用 DLQ(请参阅 Troubleshooting""
errors.deadletterqueue.context.headers.enable为 DLQ 添加其他标头""
clickhouseSettingsClickHouse 设置的逗号分隔列表(例如 "insert_quorum=2, 等等...")""
topic2TableMap将主题名称映射到表名称的逗号分隔列表(例如 "topic1=table1, topic2=table2, 等等...")""
tableRefreshInterval刷新表定义缓存的时间(以秒为单位)0
keeperOnCluster允许为自托管实例配置 ON CLUSTER 参数(例如 ON CLUSTER clusterNameInConfigFileDefinition)用于恰好一次 connect_state 表(见 分布式 DDL 查询""
bypassRowBinary允许禁用 Schema 基于数据的 RowBinary 和 RowBinaryWithDefaults 的使用(Avro、Protobuf 等)- 仅在数据将具有缺失列且 Nullable/Default 不可接受时使用。"false"
dateTimeFormats用于解析 DateTime64 模式字段的日期时间格式,使用 ; 分隔(例如 someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss)。""
tolerateStateMismatch允许连接器删除"早于"当前偏移存储的记录,AFTER_PROCESSING(例如如果发送偏移 5,而最后记录的偏移是 250)"false"
ignorePartitionsWhenBatching在收集要插入的消息时将忽略分区(尽管仅当 exactlyOncefalse时)。性能注意:连接器任务越多,每个任务分配的 Kafka 分区越少 - 这可能意味着收益递减。"false"

Target tables

ClickHouse Connect Sink 从 Kafka 主题读取消息并将其写入相应的表。ClickHouse Connect Sink 将数据写入现有表。请确保在开始向其插入数据之前,在 ClickHouse 中创建了具有适当模式的目标表。

每个主题在 ClickHouse 中需要一个专用的目标表。目标表名称必须与源主题名称匹配。

Pre-processing

如果您需要在将出站消息发送到 ClickHouse Kafka Connect Sink 之前转换消息,请使用 Kafka Connect Transformations

Supported data types

声明模式时:

Kafka Connect 类型ClickHouse 类型支持原始
STRINGString
STRINGJSON. 见下文 (1)
INT8Int8
INT16Int16
INT32Int32
INT64Int64
FLOAT32Float32
FLOAT64Float64
BOOLEANBoolean
ARRAYArray(T)
MAPMap(Primitive, T)
STRUCTVariant(T1, T2, ...)
STRUCTTuple(a T1, b T2, ...)
STRUCTNested(a T1, b T2, ...)
STRUCTJSON. 见下文 (1), (2)
BYTESString
org.apache.kafka.connect.data.TimeInt64 / DateTime64
org.apache.kafka.connect.data.TimestampInt32 / Date32
org.apache.kafka.connect.data.DecimalDecimal
  • (1) - 仅当 ClickHouse 设置为 input_format_binary_read_json_as_string=1 时,JSON 被支持。此设置仅适用于 RowBinary 格式系列,并且此设置影响插入请求中的所有列,因此它们都应该是字符串。在这种情况下,连接器会将 STRUCT 转换为 JSON 字符串。

  • (2) - 当结构具有像 oneof 的联合时,转换器应该配置为不向字段名添加前缀/后缀。为 ProtobufConverter 提供 generate.index.for.unions=false 设置

未声明模式时:

记录被转换为 JSON,并作为值以 JSONEachRow 格式发送到 ClickHouse。

Configuration recipes

这些是一些常见的配置配方,可以帮助您快速入门。

Basic configuration

最基本的配置以帮助您入门 - 假设您在分布式模式下运行 Kafka Connect,并且有一个在 localhost:8443 上运行的 ClickHouse 服务器,SSL 启用,数据为无模式 JSON。

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "consumer.override.max.poll.records": "5000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "database": "default",
    "errors.retry.timeout": "60",
    "exactlyOnce": "false",
    "hostname": "localhost",
    "port": "8443",
    "ssl": "true",
    "jdbcConnectionProperties": "?ssl=true&sslmode=strict",
    "username": "default",
    "password": "<PASSWORD>",
    "topics": "<TOPIC_NAME>",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "clickhouseSettings": ""
  }
}

Basic configuration with multiple topics

连接器可以从多个主题获取数据。

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "topics": "SAMPLE_TOPIC, ANOTHER_TOPIC, YET_ANOTHER_TOPIC",
    ...
  }
}

Basic configuration with DLQ

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "<DLQ_TOPIC>",
    "errors.deadletterqueue.context.headers.enable": "true",
  }
}

Using with different data formats

Avro schema support
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
Protobuf schema support
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}

请注意:如果您在缺少类方面遇到问题,并不是每个环境都附带 Protobuf 转换器,您可能需要一个捆绑了依赖项的替代 JAR 版本。

JSON schema support
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  }
}
String support

连接器支持不同 ClickHouse 格式中的字符串转换器:JSONCSVTSV

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "customInsertFormat": "true",
    "insertFormat": "CSV"
  }
}

Logging

Kafka Connect 平台自动提供日志记录。 日志的目的地和格式可以通过 Kafka connect 配置文件进行配置。

如果使用 Confluent 平台,可以通过运行 CLI 命令查看日志:

confluent local services connect log

有关更多详细信息,请查看官方 教程

Monitoring

ClickHouse Kafka Connect 通过 Java Management Extensions (JMX) 报告运行时指标。JMX 在 Kafka 连接器中默认启用。

ClickHouse Connect MBeanName

com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id}

ClickHouse Kafka Connect 报告以下指标:

名称类型描述
receivedRecordslong接收到的记录总数。
recordProcessingTimelong将记录分组并转换为统一结构所花费的总时间(以纳秒为单位)。
taskProcessingTimelong处理并将数据插入 ClickHouse 所花费的总时间(以纳秒为单位)。

Limitations

  • 不支持删除操作。
  • 批量大小来自 Kafka 消费者属性。
  • 使用 KeeperMap 进行恰好一次时,如果偏移量发生更改或回滚,您需要删除该特定主题在 KeeperMap 中的内容。(有关更多详细信息,请参阅下方的故障排除指南)

Tuning performance

如果您曾经想过"我想调整 sink 连接器的批量大小",那么这一部分正是为您准备的。

Connect fetch vs connector poll

Kafka Connect(我们连接器构建的框架)将在后台从 Kafka 主题获取消息(与连接器无关)。

您可以使用 fetch.min.bytesfetch.max.bytes 来控制此过程 - fetch.min.bytes 设置在框架将值传递给连接器之前所需的最小量(由 fetch.max.wait.ms 设置的时间限制),fetch.max.bytes 设置上限。如果您想将更大的批量传递给连接器,可以选择增加最小提取或最大等待时间以构建较大的数据包。

提取的数据然后由连接器客户端轮询,以获取消息,每次轮询的数量由 max.poll.records 控制 - 请注意,提取与轮询是独立的!

在调整这些设置时,用户应旨在使提取大小产生多个 max.poll.records 的批量(并记住,设置 fetch.min.bytesfetch.max.bytes 代表压缩数据) - 这样,每个连接器任务都可以插入尽可能大的批量。

ClickHouse 针对更大的批量进行了优化,即使在稍微延迟的情况下,也优于频繁但较小的批量 - 批量越大,效果越好。

consumer.max.poll.records=5000
consumer.max.partition.fetch.bytes=5242880

有关更多详细信息,请查看 Confluent 文档Kafka 文档

Multiple high throughput topics

如果您的连接器配置为订阅多个主题,您正在使用 topic2TableMap 将主题映射到表,并且在插入时遇到瓶颈导致消费者滞后,请考虑为每个主题创建一个连接器。发生这样情况的主要原因是,当前批量是以 串行方式 插入到每个表中。

每个主题创建一个连接器是确保获得最快插入速度的解决方法。

Troubleshooting

"State mismatch for topic [someTopic] partition [0]"

当 KeeperMap 中存储的偏移量与 Kafka 中存储的偏移量不同,通常发生在主题被删除或偏移量被手动调整时。 要解决此问题,您需要删除为该特定主题 + 分区存储的旧值。

注意:此调整可能会对恰好一次有影响。

"What errors will the connector retry?"

目前,重点是识别可以重试的瞬态错误,包括:

  • ClickHouseException - 这是 ClickHouse 抛出的通用异常。它通常在服务器超载时抛出,以下错误代码被视为特别瞬态:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - 当套接字超时时抛出。
  • UnknownHostException - 当主机无法解析时抛出。
  • IOException - 当网络出现问题时抛出。

"All my data is blank/zeroes"

您的数据中的字段与表中的字段不匹配可能是原因 - 这在 CDC(以及 Debezium 格式)中特别常见。 一个常见的解决方案是将扁平化转换添加到您的连接器配置中:

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_

这将把您的数据从嵌套 JSON 转换为扁平 JSON(使用 _ 作为分隔符)。表中的字段将遵循 "field1_field2_field3" 格式(即 "before_id","after_id",等)。

"I want to use my Kafka keys in ClickHouse"

默认情况下,Kafka 键不存储在值字段中,但您可以使用 KeyToValue 转换将键移动到值字段(在新字段 _key 下):

transforms=keyToValue
transforms.keyToValue.type=com.clickhouse.kafka.connect.transforms.KeyToValue
transforms.keyToValue.field=_key