Integrating Apache Beam and ClickHouse

Apache Beam is an open-source, unified programming model that enables developers to define and execute both batch and stream (continuous) data processing pipelines. The flexibility of Apache Beam lies in its ability to support a wide range of data processing scenarios, from ETL (Extract, Transform, Load) operations to complex event processing and real-time analytics. This integration leverage ClickHouse's official JDBC connector for the underlying insertion layer.

The integration package required to integrate Apache Beam and ClickHouse is maintained and developed under Apache Beam I/O Connectors - an integrations bundle of many popular data storage systems and databases. org.apache.beam.sdk.io.clickhouse.ClickHouseIO implementation located within the Apache Beam repo.

Add the following dependency to your package management framework:

dependency

groupId org.apache.beam groupId

artifactId beam-sdks-java-io-clickhouse artifactId

version ${beam.version} version

dependency



The artifacts could be found in the official maven repository.

The following example reads a CSV file named input.csv as a PCollection , converts it to a Row object (using the defined schema) and inserts it into a local ClickHouse instance using ClickHouseIO :



package org . example ;



import org . apache . beam . sdk . Pipeline ;

import org . apache . beam . sdk . io . TextIO ;

import org . apache . beam . sdk . io . clickhouse . ClickHouseIO ;

import org . apache . beam . sdk . schemas . Schema ;

import org . apache . beam . sdk . transforms . DoFn ;

import org . apache . beam . sdk . transforms . ParDo ;

import org . apache . beam . sdk . values . PCollection ;

import org . apache . beam . sdk . values . Row ;

import org . joda . time . DateTime ;





public class Main {





public static void main ( String [ ] args ) {



Pipeline p = Pipeline . create ( ) ;



Schema SCHEMA =

Schema . builder ( )

. addField ( Schema . Field . of ( "name" , Schema . FieldType . STRING ) . withNullable ( true ) )

. addField ( Schema . Field . of ( "age" , Schema . FieldType . INT16 ) . withNullable ( true ) )

. addField ( Schema . Field . of ( "insertion_time" , Schema . FieldType . DATETIME ) . withNullable ( false ) )

. build ( ) ;







PCollection < String > lines = p . apply ( "ReadLines" , TextIO . read ( ) . from ( "src/main/resources/input.csv" ) ) ;





PCollection < Row > rows = lines . apply ( "ConvertToRow" , ParDo . of ( new DoFn < String , Row > ( ) {

@ProcessElement

public void processElement ( @Element String line , OutputReceiver < Row > out ) {



String [ ] values = line . split ( "," ) ;

Row row = Row . withSchema ( SCHEMA )

. addValues ( values [ 0 ] , Short . parseShort ( values [ 1 ] ) , DateTime . now ( ) )

. build ( ) ;

out . output ( row ) ;

}

} ) ) . setRowSchema ( SCHEMA ) ;



rows . apply ( "Write to ClickHouse" ,

ClickHouseIO . write ( "jdbc:clickhouse://localhost:8123/default?user=default&password=******" , "test_table" ) ) ;





p . run ( ) . waitUntilFinish ( ) ;

}

}





ClickHouse Apache Beam TableSchema.TypeName.FLOAT32 Schema.TypeName#FLOAT TableSchema.TypeName.FLOAT64 Schema.TypeName#DOUBLE TableSchema.TypeName.INT8 Schema.TypeName#BYTE TableSchema.TypeName.INT16 Schema.TypeName#INT16 TableSchema.TypeName.INT32 Schema.TypeName#INT32 TableSchema.TypeName.INT64 Schema.TypeName#INT64 TableSchema.TypeName.STRING Schema.TypeName#STRING TableSchema.TypeName.UINT8 Schema.TypeName#INT16 TableSchema.TypeName.UINT16 Schema.TypeName#INT32 TableSchema.TypeName.UINT32 Schema.TypeName#INT64 TableSchema.TypeName.UINT64 Schema.TypeName#INT64 TableSchema.TypeName.DATE Schema.TypeName#DATETIME TableSchema.TypeName.DATETIME Schema.TypeName#DATETIME TableSchema.TypeName.ARRAY Schema.TypeName#ARRAY

Please consider the following limitations when using the connector:

The current supported ClickHouse JDBC version is 0.3.2-patch10

As of today, only Sink operation is supported (the connector doesn't support Source operation)

ClickHouse performs deduplication when inserting into a ReplicatedMergeTree or a Distributed table built on top of a ReplicatedMergeTree . Without replication, inserting into a regular MergeTree can result in duplicates if an insert fails and then successfully retries. However, each block is inserted atomically, and the block size can be configured using ClickHouseIO.Write.withMaxInsertBlockSize(long) . Deduplication is achieved by using checksums of the inserted blocks. For more information about deduplication, please visit Deduplication and Deduplicate insertion config.

