通过命名集合集成 ClickHouse 与 Kafka
在本指南中,我们将介绍如何使用命名集合将 ClickHouse 连接到 Kafka。使用命名集合的配置文件具有以下优势:
- 集中化、简化的配置管理。
- 可以在不更改 SQL 表定义的情况下修改配置。
- 只需检查单个配置文件即可更方便地审查和排查配置问题。
本指南已在 Apache Kafka 3.4.1 和 ClickHouse 24.5.1 上完成测试。
前提假设
本文档假定已具备:
- 一个可正常运行的 Kafka 集群。
- 一个已部署并正在运行的 ClickHouse 集群。
- 具备 SQL 基础,并熟悉 ClickHouse 和 Kafka 的基本配置。
先决条件
确保负责创建该具名集合的用户具备必要的访问权限:
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>
有关如何启用访问控制的更多详情,请参阅用户管理指南。
在 ClickHouse 的 config.xml 文件中添加以下配置段:
<!-- Kafka 集成的命名集合 -->
<named_collections>
<cluster_1>
<!-- ClickHouse Kafka 引擎参数 -->
<kafka_broker_list>c1-kafka-1:9094,c1-kafka-2:9094,c1-kafka-3:9094</kafka_broker_list>
<kafka_topic_list>cluster_1_clickhouse_topic</kafka_topic_list>
<kafka_group_name>cluster_1_clickhouse_consumer</kafka_group_name>
<kafka_format>JSONEachRow</kafka_format>
<kafka_commit_every_batch>0</kafka_commit_every_batch>
<kafka_num_consumers>1</kafka_num_consumers>
<kafka_thread_per_consumer>1</kafka_thread_per_consumer>
<!-- Kafka 扩展配置 -->
<kafka>
<security_protocol>SASL_SSL</security_protocol>
<enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
<sasl_mechanism>PLAIN</sasl_mechanism>
<sasl_username>kafka-client</sasl_username>
<sasl_password>kafkapassword1</sasl_password>
<debug>all</debug>
<auto_offset_reset>latest</auto_offset_reset>
</kafka>
</cluster_1>
<cluster_2>
<!-- ClickHouse Kafka 引擎参数 -->
<kafka_broker_list>c2-kafka-1:29094,c2-kafka-2:29094,c2-kafka-3:29094</kafka_broker_list>
<kafka_topic_list>cluster_2_clickhouse_topic</kafka_topic_list>
<kafka_group_name>cluster_2_clickhouse_consumer</kafka_group_name>
<kafka_format>JSONEachRow</kafka_format>
<kafka_commit_every_batch>0</kafka_commit_every_batch>
<kafka_num_consumers>1</kafka_num_consumers>
<kafka_thread_per_consumer>1</kafka_thread_per_consumer>
<!-- Kafka 扩展配置 -->
<kafka>
<security_protocol>SASL_SSL</security_protocol>
<enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
<sasl_mechanism>PLAIN</sasl_mechanism>
<sasl_username>kafka-client</sasl_username>
<sasl_password>kafkapassword2</sasl_password>
<debug>all</debug>
<auto_offset_reset>latest</auto_offset_reset>
</kafka>
</cluster_2>
</named_collections>
配置说明
- 调整 Kafka 地址和相关配置,使其与 Kafka 集群设置保持一致。
<kafka> 之前的部分包含 ClickHouse Kafka 引擎参数。完整参数列表请参阅 Kafka 引擎参数。
<kafka> 内的部分包含额外的 Kafka 配置选项。更多可用选项请参阅 librdkafka 配置说明。
- 本示例使用
SASL_SSL 安全协议和 PLAIN 机制。请根据实际的 Kafka 集群配置调整这些设置。
创建表和数据库
在你的 ClickHouse 集群上创建所需的数据库和表。如果你以单节点方式运行 ClickHouse,请省略 SQL 命令中的集群(cluster)部分,并使用除 ReplicatedMergeTree 之外的其他任意引擎。
创建数据库
CREATE DATABASE kafka_testing ON CLUSTER LAB_CLICKHOUSE_CLUSTER;
创建 Kafka 表
在第一个 Kafka 集群中创建第一张 Kafka 表:
CREATE TABLE kafka_testing.first_kafka_table ON CLUSTER LAB_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
)
ENGINE = Kafka(cluster_1);
为第二个 Kafka 集群创建第二张 Kafka 表:
CREATE TABLE kafka_testing.second_kafka_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
)
ENGINE = Kafka(cluster_2);
创建复制表
先为第一个 Kafka 表创建一张表:
CREATE TABLE kafka_testing.first_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;
为第二个 Kafka 表创建表:
CREATE TABLE kafka_testing.second_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;
创建物化视图
创建一个物化视图,将数据从第一个 Kafka 表插入到第一个复制表中:
CREATE MATERIALIZED VIEW kafka_testing.cluster_1_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO first_replicated_table AS
SELECT
id,
first_name,
last_name
FROM first_kafka_table;
创建物化视图,将第二张 Kafka 表中的数据插入到第二张副本表中:
CREATE MATERIALIZED VIEW kafka_testing.cluster_2_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO second_replicated_table AS
SELECT
id,
first_name,
last_name
FROM second_kafka_table;
验证设置
现在你应该可以在 Kafka 集群上看到相应的 consumer group(消费者组):
cluster_1_clickhouse_consumer 位于 cluster_1
cluster_2_clickhouse_consumer 位于 cluster_2
在任意一个 ClickHouse 节点上运行以下查询,以查看两个表中的数据:
SELECT * FROM first_replicated_table LIMIT 10;
SELECT * FROM second_replicated_table LIMIT 10;
在本指南中,摄取到两个 Kafka 主题中的数据是相同的。在您的实际环境中,它们会有所不同。您可以根据需要添加任意数量的 Kafka 集群。
示例输出:
┌─id─┬─first_name─┬─last_name─┐
│ 0 │ FirstName0 │ LastName0 │
│ 1 │ FirstName1 │ LastName1 │
│ 2 │ FirstName2 │ LastName2 │
└────┴────────────┴───────────┘
至此,使用命名集合完成 ClickHouse 与 Kafka 集成的配置。通过在 ClickHouse 的 config.xml 文件中集中管理 Kafka 配置,您可以更轻松地管理和调整相关设置,从而确保集成更加简洁高效。