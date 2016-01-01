Integrating ClickHouse with Kafka using Named Collections

In this guide, we will explore how to connect ClickHouse to Kafka using named collections. Using the configuration file for named collections offers several advantages:

Centralized and easier management of configuration settings.

Changes to settings can be made without altering SQL table definitions.

Easier review and troubleshooting of configurations by inspecting a single configuration file.

This guide has been tested on Apache Kafka 3.4.1 and ClickHouse 24.5.1.

This document assumes you have:

A working Kafka cluster. A ClickHouse cluster set up and running. Basic knowledge of SQL and familiarity with ClickHouse and Kafka configurations.

Ensure the user creating the named collection has the necessary access permissions:

< access_management > 1 </ access_management >

< named_collection_control > 1 </ named_collection_control >

< show_named_collections > 1 </ show_named_collections >

< show_named_collections_secrets > 1 </ show_named_collections_secrets >



Refer to the User Management Guide for more details on enabling access control.

Add the following section to your ClickHouse config.xml file:



< named_collections >

< cluster_1 >



< kafka_broker_list > c1-kafka-1:9094,c1-kafka-2:9094,c1-kafka-3:9094 </ kafka_broker_list >

< kafka_topic_list > cluster_1_clickhouse_topic </ kafka_topic_list >

< kafka_group_name > cluster_1_clickhouse_consumer </ kafka_group_name >

< kafka_format > JSONEachRow </ kafka_format >

< kafka_commit_every_batch > 0 </ kafka_commit_every_batch >

< kafka_num_consumers > 1 </ kafka_num_consumers >

< kafka_thread_per_consumer > 1 </ kafka_thread_per_consumer >





< kafka >

< security_protocol > SASL_SSL </ security_protocol >

< enable_ssl_certificate_verification > false </ enable_ssl_certificate_verification >

< sasl_mechanism > PLAIN </ sasl_mechanism >

< sasl_username > kafka-client </ sasl_username >

< sasl_password > kafkapassword1 </ sasl_password >

< debug > all </ debug >

< auto_offset_reset > latest </ auto_offset_reset >

</ kafka >

</ cluster_1 >



< cluster_2 >



< kafka_broker_list > c2-kafka-1:29094,c2-kafka-2:29094,c2-kafka-3:29094 </ kafka_broker_list >

< kafka_topic_list > cluster_2_clickhouse_topic </ kafka_topic_list >

< kafka_group_name > cluster_2_clickhouse_consumer </ kafka_group_name >

< kafka_format > JSONEachRow </ kafka_format >

< kafka_commit_every_batch > 0 </ kafka_commit_every_batch >

< kafka_num_consumers > 1 </ kafka_num_consumers >

< kafka_thread_per_consumer > 1 </ kafka_thread_per_consumer >





< kafka >

< security_protocol > SASL_SSL </ security_protocol >

< enable_ssl_certificate_verification > false </ enable_ssl_certificate_verification >

< sasl_mechanism > PLAIN </ sasl_mechanism >

< sasl_username > kafka-client </ sasl_username >

< sasl_password > kafkapassword2 </ sasl_password >

< debug > all </ debug >

< auto_offset_reset > latest </ auto_offset_reset >

</ kafka >

</ cluster_2 >

</ named_collections >



Adjust Kafka addresses and related configurations to match your Kafka cluster setup. The section before <kafka> contains ClickHouse Kafka engine parameters. For a full list of parameters, refer to the Kafka engine parameters . The section within <kafka> contains extended Kafka configuration options. For more options, refer to the librdkafka configuration. This example uses the SASL_SSL security protocol and PLAIN mechanism. Adjust these settings based on your Kafka cluster configuration.

Create the necessary databases and tables on your ClickHouse cluster. If you run ClickHouse as a single node, omit the cluster part of the SQL command and use any other engine instead of ReplicatedMergeTree .

CREATE DATABASE kafka_testing ON CLUSTER LAB_CLICKHOUSE_CLUSTER ;



Create the first Kafka table for the first Kafka cluster:

CREATE TABLE kafka_testing . first_kafka_table ON CLUSTER LAB_CLICKHOUSE_CLUSTER

(

` id ` UInt32 ,

` first_name ` String ,

` last_name ` String

)

ENGINE = Kafka ( cluster_1 ) ;



Create the second Kafka table for the second Kafka cluster:

CREATE TABLE kafka_testing . second_kafka_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER

(

` id ` UInt32 ,

` first_name ` String ,

` last_name ` String

)

ENGINE = Kafka ( cluster_2 ) ;



Create a table for the first Kafka table:

CREATE TABLE kafka_testing . first_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER

(

` id ` UInt32 ,

` first_name ` String ,

` last_name ` String

) ENGINE = ReplicatedMergeTree ( )

ORDER BY id ;



Create a table for the second Kafka table:

CREATE TABLE kafka_testing . second_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER

(

` id ` UInt32 ,

` first_name ` String ,

` last_name ` String

) ENGINE = ReplicatedMergeTree ( )

ORDER BY id ;



Create a materialized view to insert data from the first Kafka table into the first replicated table:

CREATE MATERIALIZED VIEW kafka_testing . cluster_1_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO first_replicated_table AS

SELECT

id ,

first_name ,

last_name

FROM first_kafka_table ;



Create a materialized view to insert data from the second Kafka table into the second replicated table:

CREATE MATERIALIZED VIEW kafka_testing . cluster_2_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO second_replicated_table AS

SELECT

id ,

first_name ,

last_name

FROM second_kafka_table ;



You should now see the relative consumer groups on your Kafka clusters:

cluster_1_clickhouse_consumer on cluster_1

on cluster_2_clickhouse_consumer on cluster_2

Run the following queries on any of your ClickHouse nodes to see the data in both tables:

SELECT * FROM first_replicated_table LIMIT 10 ;



SELECT * FROM second_replicated_table LIMIT 10 ;



In this guide, the data ingested in both Kafka topics is the same. In your case, they would differ. You can add as many Kafka clusters as you want.

Example output:

┌─id─┬─first_name─┬─last_name─┐

│ 0 │ FirstName0 │ LastName0 │

│ 1 │ FirstName1 │ LastName1 │

│ 2 │ FirstName2 │ LastName2 │

└────┴────────────┴───────────┘

