从 DynamoDB 到 ClickHouse 的 CDC
Experimental feature. Learn more.
本文介绍如何使用 ClickPipes 设置从 DynamoDB 到 ClickHouse 的 CDC。这一集成有两个组件:
- 通过 S3 ClickPipes 进行初始快照
- 通过 Kinesis ClickPipes 进行实时更新
数据将被摄入到一个 ReplacingMergeTree
中。此表引擎通常用于 CDC 场景,以允许应用更新操作。关于此模式的更多信息可以在以下博客文章中找到:
1. 设置 Kinesis 流
首先,您需要在 DynamoDB 表上启用 Kinesis 流,以实时捕获更改。在创建快照之前,我们需要这样做,以避免丢失任何数据。 可以在 这里 找到 AWS 指南。

2. 创建快照
接下来,我们将创建 DynamoDB 表的快照。这可以通过 AWS 导出到 S3 来实现。可以在 这里 找到 AWS 指南。 您将需要执行 DynamoDB JSON 格式的“完整导出”。

3. 将快照加载到 ClickHouse
创建必要的表
DynamoDB 的快照数据大致如下所示:
请注意,数据是以嵌套格式呈现的。在将其加载到 ClickHouse 之前,我们需要扁平化这些数据。这可以通过使用 ClickHouse 中的 JSONExtract
函数在物化视图中实现。
我们需要创建三个表:
- 一个用于存储来自 DynamoDB 的原始数据的表
- 一个用于存储最终扁平数据(目标表)的表
- 一个用于扁平化数据的物化视图
对于上述示例的 DynamoDB 数据,ClickHouse 表将如下所示:
目标表有一些要求:
- 该表必须是一个
ReplacingMergeTree
表 - 表必须有一个
version
列- 在后面的步骤中,我们将把来自 Kinesis 流的
ApproximateCreationDateTime
字段映射到version
列。
- 在后面的步骤中,我们将把来自 Kinesis 流的
- 表应使用分区键作为排序键(由
ORDER BY
指定)- 具有相同排序键的行将根据
version
列进行去重。
- 具有相同排序键的行将根据
创建快照 ClickPipe
现在您可以创建一个 ClickPipe,将快照数据从 S3 加载到 ClickHouse。按照 S3 ClickPipe 指南 这里,但使用以下设置:
- 摄取路径:您需要找到导出 JSON 文件在 S3 中的路径。路径大致如下所示:
- 格式:JSONEachRow
- 表:您的快照表(例如上面例子中的
default.snapshot
)
创建后,数据将开始填充到快照和目标表中。您不需要等到快照加载完成后再继续进行下一步。
4. 创建 Kinesis ClickPipe
现在我们可以设置 Kinesis ClickPipe 来捕获来自 Kinesis 流的实时更改。按照 Kinesis ClickPipe 指南 这里,但使用以下设置:
- 流:步骤 1 中使用的 Kinesis 流
- 表:您的目标表(例如上面例子中的
default.destination
) - 扁平化对象:true
- 列映射:
ApproximateCreationDateTime
:version
- 将其他字段映射到适当的目标列,如下所示

5. 清理(可选)
一旦快照 ClickPipe 完成,您可以删除快照表和物化视图。