跳到主要内容
跳到主要内容

集成 Apache Beam 和 ClickHouse

Apache Beam 是一个开源的统一编程模型,使开发人员能够定义和执行批处理及流式(持续)数据处理管道。Apache Beam 的灵活性在于它能够支持广泛的数据处理场景,从 ETL(提取、转换、加载)操作到复杂事件处理和实时分析。 该集成利用了 ClickHouse 的官方 JDBC 连接器 作为底层插入层。

集成包

集成 Apache Beam 和 ClickHouse 所需的集成包在 Apache Beam I/O Connectors 下维护和开发 - 这是一个包含许多流行数据存储系统和数据库的集成包。 org.apache.beam.sdk.io.clickhouse.ClickHouseIO 实现位于 Apache Beam repo 中。

Apache Beam ClickHouse 包的设置

包安装

将以下依赖项添加到您的包管理框架中:

推荐的 Beam 版本

ClickHouseIO 连接器建议从 Apache Beam 版本 2.59.0 开始使用。 早期版本可能无法完全支持连接器的功能。

可以在 官方 maven 仓库 中找到构件。

代码示例

以下示例将名为 input.csv 的 CSV 文件读取为 PCollection,使用定义的模式将其转换为 Row 对象,并使用 ClickHouseIO 将其插入到本地 ClickHouse 实例中:

支持的数据类型

ClickHouseApache Beam是否支持备注
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes 是一个 LogicalType,表示固定长度
字节数组,位于
org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

ClickHouseIO.Write 参数

您可以使用以下设置函数调整 ClickHouseIO.Write 配置:

参数设置函数参数类型默认值描述
withMaxInsertBlockSize(long maxInsertBlockSize)1000000要插入的行块的最大大小。
withMaxRetries(int maxRetries)5失败插入的最大重试次数。
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)最大重试的累积退避持续时间。
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)第一次重试前的初始退避持续时间。
withInsertDistributedSync(Boolean sync)true如果为 true,则同步分布式表的插入操作。
withInsertQuorum(Long quorum)null确认插入操作所需的副本数量。
withInsertDeduplicate(Boolean deduplicate)true如果为 true,则启用插入操作的去重。
withTableSchema(TableSchema schema)null目标 ClickHouse 表的模式。

限制

使用该连接器时,请考虑以下限制:

  • 截至今天,仅支持 Sink 操作。该连接器不支持 Source 操作。
  • 当插入到 ReplicatedMergeTree 或建立在 ReplicatedMergeTree 之上的 Distributed 表时,ClickHouse 执行去重。如果没有复制,则插入到常规的 MergeTree 在插入失败后成功重试时可能会导致重复。然而,每个块都是原子插入的,块的大小可以使用 ClickHouseIO.Write.withMaxInsertBlockSize(long) 配置。去重是通过使用插入块的校验和实现的。有关去重的更多信息,请访问 DeduplicationDeduplicate insertion config
  • 该连接器不执行任何 DDL 语句;因此,目标表必须在插入之前存在。