Kafka
建议 ClickHouse Cloud 用户使用 ClickPipes 将 Kafka 数据流入 ClickHouse。它原生支持高性能插入,同时确保关注点分离,可以独立扩展数据摄取和集群资源。
该引擎与 Apache Kafka 一起工作。
Kafka 让您能够:
- 发布或订阅数据流。
- 组织容错存储。
- 持续处理可用的流。
创建表
必需的参数:
kafka_broker_list
— 以逗号分隔的代理列表(例如,localhost:9092
)。kafka_topic_list
— Kafka 主题列表。kafka_group_name
— Kafka 消费者组。每个组单独跟踪读取边界。如果您不希望消息在集群中重复,必须在所有地方使用相同的组名。kafka_format
— 消息格式。采用与 SQLFORMAT
函数相同的标记法,例如JSONEachRow
。有关更多信息,请参见 Formats 部分。
可选参数:
kafka_security_protocol
- 与代理通信所使用的协议。可能的值:plaintext
、ssl
、sasl_plaintext
、sasl_ssl
。kafka_sasl_mechanism
- 用于身份验证的 SASL 机制。可能的值:GSSAPI
、PLAIN
、SCRAM-SHA-256
、SCRAM-SHA-512
、OAUTHBEARER
。kafka_sasl_username
- 用于PLAIN
和SASL-SCRAM-..
机制的 SASL 用户名。kafka_sasl_password
- 用于PLAIN
和SASL-SCRAM-..
机制的 SASL 密码。kafka_schema
— 如果格式要求模式定义,必须使用的参数。例如,Cap'n Proto 要求提供模式文件的路径和根schema.capnp:Message
对象的名称。kafka_num_consumers
— 每个表的消费者数量。如果单个消费者的吞吐量不足,请指定更多消费者。消费者的总数量不得超过主题中的分区数量,因为每个分区只能分配一个消费者,并且不得大于部署 ClickHouse 服务器上的物理核心数量。默认值:1
。kafka_max_block_size
— 获取的最大批量大小(以消息为单位)。默认值:max_insert_block_size。kafka_skip_broken_messages
— 对每个块的与模式不兼容的消息的 Kafka 消息解析器容忍度。如果kafka_skip_broken_messages = N
,则引擎跳过 N 条无法解析的 Kafka 消息(一条消息等于一行数据)。默认值:0
。kafka_commit_every_batch
— 每处理完一批数据后提交,而不是在写入整个块后进行单次提交。默认值:0
。kafka_client_id
— 客户端标识符。默认为空。kafka_poll_timeout_ms
— 从 Kafka 进行单次轮询的超时。默认值:stream_poll_timeout_ms。kafka_poll_max_batch_size
— 在单次 Kafka 轮询中要轮询的最大消息数。默认值:max_block_size。kafka_flush_interval_ms
— 从 Kafka 刷新数据的超时。默认值:stream_flush_interval_ms。kafka_thread_per_consumer
— 为每个消费者提供独立线程。当启用时,每个消费者独立并行地刷新数据(否则 - 来自多个消费者的行被压缩成一个块)。默认值:0
。kafka_handle_error_mode
— 如何处理 Kafka 引擎的错误。可能的值:默认(如果解析消息失败将抛出异常),流(异常消息和原始消息将在虚拟列_error
和_raw_message
中保存)。kafka_commit_on_select
— 当进行选择查询时提交消息。默认值:false
。kafka_max_rows_per_message
— 每个基于行的格式中写入的一条 kafka 消息的最大行数。默认值:1
。
示例:
创建表的弃用方法
在新项目中不要使用此方法。如果可能,请将旧项目切换到上面描述的方法。
Kafka 表引擎不支持具有 默认值 的列。如果您需要具有默认值的列,可以在物化视图级别添加它们(见下文)。
描述
交付的消息会自动跟踪,因此每个组中的每条消息只计数一次。如果您想获取数据两次,则需要使用另一个组名创建表的副本。
组是灵活的并在集群中同步。例如,如果您有 10 个主题和 5 个表副本在集群中,则每个副本获得 2 个主题。如果副本数量发生变化,主题会自动在副本之间重新分配。请在 http://kafka.apache.org/intro 上阅读更多内容。
SELECT
对于读取消息并不是特别有用(除非用于调试),因为每条消息只能被读取一次。创建实时线程使用物化视图更为实用。要做到这一点:
- 使用引擎创建 Kafka 消费者,并将其视为数据流。
- 创建具有所需结构的表。
- 创建一个物化视图,将来自引擎的数据转换并放入之前创建的表中。
当 MATERIALIZED VIEW
连接引擎时,它开始在后台收集数据。这使您能够不断接收来自 Kafka 的消息并使用 SELECT
将其转换为所需的格式。
一个 kafka 表可以有任意数量的物化视图,它们不会直接从 kafka 表读取数据,而是以块的形式接收新记录,这样您可以写入多个具有不同详细级别的表(使用分组 - 聚合和不使用)。
示例:
为了提高性能,接收的消息被分组为 max_insert_block_size 大小的块。如果在 stream_flush_interval_ms 毫秒内没有形成块,则数据将被刷新到表中,而不考虑块的完整性。
要停止接收主题数据或更改转换逻辑,请分离物化视图:
如果您希望通过 ALTER
更改目标表,建议禁用物化视图,以避免目标表和视图数据之间的不一致。
配置
类似于 GraphiteMergeTree,Kafka 引擎支持使用 ClickHouse 配置文件的扩展配置。您可以使用两个配置键:全局(在 <kafka>
下面)和主题级(在 <kafka><kafka_topic>
下面)。全局配置首先应用,然后应用主题级配置(如果存在)。
有关可能的配置选项列表,请参见 librdkafka 配置参考。在 ClickHouse 配置中,使用下划线 (_
) 替代点。例如,check.crcs=true
将变为 <check_crcs>true</check_crcs>
。
Kerberos支持
要处理支持 Kerberos 的 Kafka,请添加 security_protocol
子元素,值为 sasl_plaintext
。只要 Kerberos 票证授权票据由操作系统设施获取并缓存即可。
ClickHouse 可以使用密钥表文件维护 Kerberos 凭证。考虑 sasl_kerberos_service_name
、sasl_kerberos_keytab
和 sasl_kerberos_principal
子元素。
示例:
虚拟列
_topic
— Kafka 主题。数据类型:LowCardinality(String)
。_key
— 消息的键。数据类型:String
。_offset
— 消息的偏移量。数据类型:UInt64
。_timestamp
— 消息的时间戳。数据类型:Nullable(DateTime)
。_timestamp_ms
— 消息的时间戳(以毫秒为单位)。数据类型:Nullable(DateTime64(3))
。_partition
— Kafka 主题的分区。数据类型:UInt64
。_headers.name
— 消息头的键数组。数据类型:Array(String)
。_headers.value
— 消息头的值数组。数据类型:Array(String)
。
当 kafka_handle_error_mode='stream'
时的附加虚拟列:
_raw_message
- 无法成功解析的原始消息。数据类型:String
。_error
- 解析失败期间发生的异常消息。数据类型:String
。
注意:当解析成功时,虚拟列 _raw_message
和 _error
始终为空,仅在解析期间发生异常时填充。
数据格式支持
Kafka 引擎支持 ClickHouse 支持的所有 格式。 一条 Kafka 消息中的行数取决于格式是行式还是块式:
- 对于行式格式,可以通过设置
kafka_max_rows_per_message
控制一条 Kafka 消息中的行数。 - 对于块式格式,我们无法将块拆分为更小的部分,但可以通过通用设置 max_block_size 控制一个块中的行数。
存储已提交偏移量的引擎在 ClickHouse Keeper 中
如果启用 allow_experimental_kafka_offsets_storage_in_keeper
,则可以为 Kafka 表引擎指定另外两个设置:
kafka_keeper_path
指定 ClickHouse Keeper 中表的路径kafka_replica_name
指定 ClickHouse Keeper 中的副本名
必须指定这两个设置中的全部或全无。当两个设置同时指定时,将使用一种新的实验性 Kafka 引擎。该新引擎不依赖于 Kafka 中存储已提交的偏移量,而是将其存储在 ClickHouse Keeper 中。它仍会尝试将偏移量提交到 Kafka,但仅在创建表时依赖这些偏移量。在其他情况下(表被重启或在某种错误后恢复时),将使用存储在 ClickHouse Keeper 中的偏移量作为继续消费消息的偏移量。除了已提交的偏移量外,它还存储了最后一批消费的消息数,因此如果插入失败,将消费相同数量的消息,从而在必要时启用去重。
示例:
或者利用 uuid
和 replica
宏,类似于 ReplicatedMergeTree:
已知限制
由于新引擎是实验性的,因此尚未准备好投入生产。该实现存在一些已知限制:
- 最大的限制是引擎不支持直接读取。使用物化视图从引擎读取和写入到引擎工作正常,但直接读取不行。因此,所有直接的
SELECT
查询将失败。 - 快速删除和重新创建表或将相同的 ClickHouse Keeper 路径指定给不同引擎可能导致问题。最佳实践是在
kafka_keeper_path
中使用{uuid}
以避免路径冲突。 - 为了确保可重复的读取,在单个线程中不能从多个分区消费消息。另一方面,需要定期轮询 Kafka 消费者以保持其活动。因此基于这两个目标,我们决定仅在启用
kafka_thread_per_consumer
的情况下允许创建多个消费者,否则对于定期轮询消费者难以避免问题。 - 由新存储引擎创建的消费者不会出现在
system.kafka_consumers
表中。
另请参阅