CDC from DynamoDB to ClickHouse
このページでは、ClickPipesを使用してDynamoDBからClickHouseへのCDCを設定する方法について説明します。この統合には2つのコンポーネントがあります:
- S3 ClickPipesを介した初期スナップショット
- Kinesis ClickPipesを介したリアルタイムの更新
データはReplacingMergeTree
に取り込まれます。このテーブルエンジンはCDCシナリオによく使用され、更新操作を適用できるようにします。このパターンに関する詳細は、以下のブログ記事で見つかります:
1. Kinesis Streamの設定
最初に、DynamoDBテーブルでKinesisストリームを有効にして、リアルタイムで変更をキャプチャします。スナップショットを作成する前にこれを行い、データの喪失を避ける必要があります。 AWSガイドはこちらにあります。

2. スナップショットの作成
次に、DynamoDBテーブルのスナップショットを作成します。これは、AWSからS3へのエクスポートを通じて実行できます。AWSガイドはこちらにあります。 DynamoDB JSONフォーマットで「フルエクスポート」を実行する必要があります。

3. スナップショットをClickHouseにロード
必要なテーブルの作成
DynamoDBからのスナップショットデータは次のようになります:
データがネストされた形式になっていることに注意してください。ClickHouseにロードする前に、このデータをフラット化する必要があります。これは、ClickHouseのMaterialized ViewでJSONExtract
関数を使用して行えます。
次の3つのテーブルを作成する必要があります:
- DynamoDBからの生データを格納するテーブル
- 最終的にフラット化されたデータを格納するテーブル(宛先テーブル)
- データをフラット化するためのMaterialized View
上記の例のDynamoDBデータに対するClickHouseのテーブルは次のようになります:
宛先テーブルにはいくつかの要件があります:
- このテーブルは
ReplacingMergeTree
テーブルである必要があります - テーブルには
version
カラムが必要です- 後のステップで、Kinesisストリームから
ApproximateCreationDateTime
フィールドをversion
カラムにマッピングします。
- 後のステップで、Kinesisストリームから
- テーブルはソートキーとしてパーティションキーを使用する必要があります(
ORDER BY
で指定)- 同じソートキーを持つ行は
version
カラムに基づいて重複排除されます。
- 同じソートキーを持つ行は
スナップショットClickPipeの作成
次に、S3からClickHouseにスナップショットデータをロードするためのClickPipeを作成できます。S3 ClickPipeガイドに従ってこちらを参照してくださいが、次の設定を使用します:
- インジェストパス:エクスポートされたjsonファイルのS3内のパスを特定する必要があります。パスは次のようになります:
- フォーマット:JSONEachRow
- テーブル:スナップショットテーブル(例:上記の
default.snapshot
)
作成が完了すると、スナップショットテーブルと宛先テーブルにデータがポピュレートし始めます。次のステップに進む前にスナップショットのロードが完了するのを待つ必要はありません。
4. Kinesis ClickPipeの作成
次に、Kinesisストリームからリアルタイムで変更をキャプチャするためのKinesis ClickPipeを設定できます。Kinesis ClickPipeガイドに従ってこちらを参照してくださいが、次の設定を使用します:
- ストリーム:ステップ1で使用したKinesisストリーム
- テーブル:宛先テーブル(例:上記の
default.destination
) - オブジェクトのフラット化:true
- カラムマッピング:
ApproximateCreationDateTime
:version
- 他のフィールドを、以下に示す宛先カラムに適切にマッピングします

5. クリーンアップ(オプション)
スナップショットClickPipeが完了したら、スナップショットテーブルとマテリアライズドビューを削除できます。