跳到主要内容
跳到主要内容

从 DynamoDB 到 ClickHouse 的 CDC

Experimental feature. Learn more.

本页面介绍如何使用 ClickPipes 设置从 DynamoDB 到 ClickHouse 的变更数据捕获 (CDC)。此集成有两个组成部分:

  1. 通过 S3 ClickPipes 的初始快照
  2. 通过 Kinesis ClickPipes 的实时更新

数据将被导入到 ReplacingMergeTree 中。该表引擎通常用于 CDC 场景,以允许应用更新操作。有关此模式的更多信息,请参见以下博客文章:

1. 设置 Kinesis Stream

首先,您需要在 DynamoDB 表上启用 Kinesis 流以实时捕获更改。在创建快照之前,我们希望做到这一点,以避免丢失任何数据。请查阅 AWS 指南

DynamoDB Kinesis Stream

2. 创建快照

接下来,我们将创建 DynamoDB 表的快照。这可以通过 AWS 导出到 S3 来实现。请查阅 AWS 指南
您希望以 DynamoDB JSON 格式进行“完整导出”。

DynamoDB S3 Export

3. 将快照加载到 ClickHouse

创建必要的表

来自 DynamoDB 的快照数据看起来大致如下:

请注意,数据采用嵌套格式。在将其加载到 ClickHouse 之前,我们需要扁平化这些数据。可以使用 ClickHouse 中的 JSONExtract 函数在物化视图中完成此操作。

我们需要创建三个表:

  1. 用于存储来自 DynamoDB 的原始数据的表
  2. 用于存储最终扁平化数据(目标表)的表
  3. 一个物化视图用于扁平化数据

对于上面的示例 DynamoDB 数据,ClickHouse 的表将如下所示:

目标表有一些要求:

  • 此表必须是 ReplacingMergeTree
  • 表必须包含一个 version
    • 在后续步骤中,我们将把 Kinesis 流中的 ApproximateCreationDateTime 字段映射到 version 列。
  • 表应将分区键用作排序键(由 ORDER BY 指定)
    • 拥有相同排序键的行将根据 version 列进行去重。

创建快照 ClickPipe

现在您可以创建一个 ClickPipe,将快照数据从 S3 加载到 ClickHouse。请按照 S3 ClickPipe 指南 这里 的步骤,但使用以下设置:

  • 导入路径:您需要找到在 S3 中导出的 json 文件的路径。路径大致如下:
  • 格式:JSONEachRow
  • :您的快照表(例如,上述的 default.snapshot

创建后,数据将开始填充到快照和目标表中。您不需要在继续进行下一步之前等待快照加载完成。

4. 创建 Kinesis ClickPipe

现在我们可以设置 Kinesis ClickPipe 从 Kinesis 流捕获实时更改。请按照 Kinesis ClickPipe 指南 这里 的步骤,但使用以下设置:

  • :步骤 1 中使用的 Kinesis 流
  • :您的目标表(例如,上述的 default.destination
  • 扁平化对象:true
  • 列映射
    • ApproximateCreationDateTime: version
    • 将其他字段映射到相应的目标列,如下所示
DynamoDB Map Columns

5. 清理 (可选)

一旦快照 ClickPipe 完成,您可以删除快照表和物化视图。