集成 Apache Beam 和 ClickHouse
Apache Beam 是一个开源的统一编程模型,使开发人员能够定义和执行批处理及流式(持续)数据处理管道。Apache Beam 的灵活性在于它能够支持广泛的数据处理场景,从 ETL(提取、转换、加载)操作到复杂事件处理和实时分析。 该集成利用了 ClickHouse 的官方 JDBC 连接器 作为底层插入层。
集成包
集成 Apache Beam 和 ClickHouse 所需的集成包在 Apache Beam I/O Connectors 下维护和开发 - 这是一个包含许多流行数据存储系统和数据库的集成包。
org.apache.beam.sdk.io.clickhouse.ClickHouseIO
实现位于 Apache Beam repo 中。
Apache Beam ClickHouse 包的设置
包安装
将以下依赖项添加到您的包管理框架中:
推荐的 Beam 版本
ClickHouseIO
连接器建议从 Apache Beam 版本 2.59.0
开始使用。
早期版本可能无法完全支持连接器的功能。
可以在 官方 maven 仓库 中找到构件。
代码示例
以下示例将名为 input.csv
的 CSV 文件读取为 PCollection
,使用定义的模式将其转换为 Row 对象,并使用 ClickHouseIO
将其插入到本地 ClickHouse 实例中:
支持的数据类型
ClickHouse | Apache Beam | 是否支持 | 备注 |
---|---|---|---|
TableSchema.TypeName.FLOAT32 | Schema.TypeName#FLOAT | ✅ | |
TableSchema.TypeName.FLOAT64 | Schema.TypeName#DOUBLE | ✅ | |
TableSchema.TypeName.INT8 | Schema.TypeName#BYTE | ✅ | |
TableSchema.TypeName.INT16 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.INT32 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.INT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.STRING | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.UINT8 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.UINT16 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.UINT32 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.UINT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.DATE | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.DATETIME | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.ARRAY | Schema.TypeName#ARRAY | ✅ | |
TableSchema.TypeName.ENUM8 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.ENUM16 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.BOOL | Schema.TypeName#BOOLEAN | ✅ | |
TableSchema.TypeName.TUPLE | Schema.TypeName#ROW | ✅ | |
TableSchema.TypeName.FIXEDSTRING | FixedBytes | ✅ | FixedBytes 是一个 LogicalType ,表示固定长度 字节数组,位于 org.apache.beam.sdk.schemas.logicaltypes |
Schema.TypeName#DECIMAL | ❌ | ||
Schema.TypeName#MAP | ❌ |
ClickHouseIO.Write 参数
您可以使用以下设置函数调整 ClickHouseIO.Write
配置:
参数设置函数 | 参数类型 | 默认值 | 描述 |
---|---|---|---|
withMaxInsertBlockSize | (long maxInsertBlockSize) | 1000000 | 要插入的行块的最大大小。 |
withMaxRetries | (int maxRetries) | 5 | 失败插入的最大重试次数。 |
withMaxCumulativeBackoff | (Duration maxBackoff) | Duration.standardDays(1000) | 最大重试的累积退避持续时间。 |
withInitialBackoff | (Duration initialBackoff) | Duration.standardSeconds(5) | 第一次重试前的初始退避持续时间。 |
withInsertDistributedSync | (Boolean sync) | true | 如果为 true,则同步分布式表的插入操作。 |
withInsertQuorum | (Long quorum) | null | 确认插入操作所需的副本数量。 |
withInsertDeduplicate | (Boolean deduplicate) | true | 如果为 true,则启用插入操作的去重。 |
withTableSchema | (TableSchema schema) | null | 目标 ClickHouse 表的模式。 |
限制
使用该连接器时,请考虑以下限制:
- 截至今天,仅支持 Sink 操作。该连接器不支持 Source 操作。
- 当插入到
ReplicatedMergeTree
或建立在ReplicatedMergeTree
之上的Distributed
表时,ClickHouse 执行去重。如果没有复制,则插入到常规的 MergeTree 在插入失败后成功重试时可能会导致重复。然而,每个块都是原子插入的,块的大小可以使用ClickHouseIO.Write.withMaxInsertBlockSize(long)
配置。去重是通过使用插入块的校验和实现的。有关去重的更多信息,请访问 Deduplication 和 Deduplicate insertion config。 - 该连接器不执行任何 DDL 语句;因此,目标表必须在插入之前存在。
相关内容
ClickHouseIO
类 文档。- 示例的
Github
仓库 clickhouse-beam-connector。