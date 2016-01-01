Integrating Apache Beam and ClickHouse
Apache Beam is an open-source, unified programming model that enables developers to define and execute both batch and stream (continuous) data processing pipelines. The flexibility of Apache Beam lies in its ability to support a wide range of data processing scenarios, from ETL (Extract, Transform, Load) operations to complex event processing and real-time analytics. This integration leverage ClickHouse's official JDBC connector for the underlying insertion layer.
Integration Package
The integration package required to integrate Apache Beam and ClickHouse is maintained and developed under Apache Beam I/O Connectors - an integrations bundle of many popular data storage systems and databases.
org.apache.beam.sdk.io.clickhouse.ClickHouseIO implementation located within the Apache Beam repo.
Setup of the Apache Beam ClickHouse package
Package installation
Add the following dependency to your package management framework:
The
ClickHouseIO connector is recommended for use starting from Apache Beam version
2.59.0.
Earlier versions may not fully support the connector's functionality.
The artifacts could be found in the official maven repository.
Code Example
The following example reads a CSV file named
input.csv as a
PCollection, converts it to a Row object (using the defined schema) and inserts it into a local ClickHouse instance using
ClickHouseIO:
Supported Data Types
|ClickHouse
|Apache Beam
|Is Supported
|Notes
TableSchema.TypeName.FLOAT32
Schema.TypeName#FLOAT
|✅
TableSchema.TypeName.FLOAT64
Schema.TypeName#DOUBLE
|✅
TableSchema.TypeName.INT8
Schema.TypeName#BYTE
|✅
TableSchema.TypeName.INT16
Schema.TypeName#INT16
|✅
TableSchema.TypeName.INT32
Schema.TypeName#INT32
|✅
TableSchema.TypeName.INT64
Schema.TypeName#INT64
|✅
TableSchema.TypeName.STRING
Schema.TypeName#STRING
|✅
TableSchema.TypeName.UINT8
Schema.TypeName#INT16
|✅
TableSchema.TypeName.UINT16
Schema.TypeName#INT32
|✅
TableSchema.TypeName.UINT32
Schema.TypeName#INT64
|✅
TableSchema.TypeName.UINT64
Schema.TypeName#INT64
|✅
TableSchema.TypeName.DATE
Schema.TypeName#DATETIME
|✅
TableSchema.TypeName.DATETIME
Schema.TypeName#DATETIME
|✅
TableSchema.TypeName.ARRAY
Schema.TypeName#ARRAY
|✅
TableSchema.TypeName.ENUM8
Schema.TypeName#STRING
|✅
TableSchema.TypeName.ENUM16
Schema.TypeName#STRING
|✅
TableSchema.TypeName.BOOL
Schema.TypeName#BOOLEAN
|✅
TableSchema.TypeName.TUPLE
Schema.TypeName#ROW
|✅
TableSchema.TypeName.FIXEDSTRING
FixedBytes
|✅
FixedBytes is a
LogicalType representing a fixed-length
byte array located at
org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
|❌
Schema.TypeName#MAP
|❌
ClickHouseIO.Write Parameters
You can adjust the
ClickHouseIO.Write configuration with the following setter functions:
|Parameter Setter Function
|Argument Type
|Default Value
|Description
withMaxInsertBlockSize
(long maxInsertBlockSize)
1000000
|Maximum size of a block of rows to insert.
withMaxRetries
(int maxRetries)
5
|Maximum number of retries for failed inserts.
withMaxCumulativeBackoff
(Duration maxBackoff)
Duration.standardDays(1000)
|Maximum cumulative backoff duration for retries.
withInitialBackoff
(Duration initialBackoff)
Duration.standardSeconds(5)
|Initial backoff duration before the first retry.
withInsertDistributedSync
(Boolean sync)
true
|If true, synchronizes insert operations for distributed tables.
withInsertQuorum
(Long quorum)
null
|The number of replicas required to confirm an insert operation.
withInsertDeduplicate
(Boolean deduplicate)
true
|If true, deduplication is enabled for insert operations.
withTableSchema
(TableSchema schema)
null
|Schema of the target ClickHouse table.
Limitations
Please consider the following limitations when using the connector:
- As of today, only Sink operation is supported. The connector doesn't support Source operation.
- ClickHouse performs deduplication when inserting into a
ReplicatedMergeTreeor a
Distributedtable built on top of a
ReplicatedMergeTree. Without replication, inserting into a regular MergeTree can result in duplicates if an insert fails and then successfully retries. However, each block is inserted atomically, and the block size can be configured using
ClickHouseIO.Write.withMaxInsertBlockSize(long). Deduplication is achieved by using checksums of the inserted blocks. For more information about deduplication, please visit Deduplication and Deduplicate insertion config.
- The connector doesn't perform any DDL statements; therefore, the target table must exist prior insertion.
Related Content
ClickHouseIOclass documentation.
Githubrepository of examples clickhouse-beam-connector.