DynamoDBからClickHouseへのCDC
このページでは、ClickPipesを使用してDynamoDBからClickHouseへのCDCの設定方法を説明します。この統合には2つのコンポーネントがあります:
- S3 ClickPipesを介した初期スナップショット
- Kinesis ClickPipesを介したリアルタイム更新
データはReplacingMergeTree
に取り込まれます。このテーブルエンジンは、更新操作を適用できるようにCDCシナリオで一般的に使用されます。このパターンに関する詳細は、以下のブログ記事に記載されています:
1. Kinesis Streamの設定
まず、DynamoDBテーブルでKinesisストリームを有効にして、リアルタイムでの変更をキャプチャする必要があります。これは、スナップショットを作成する前に行い、データの欠落を避けるためです。 AWSガイドはこちらにあります。

2. スナップショットの作成
次に、DynamoDBテーブルのスナップショットを作成します。これは、AWSエクスポートをS3に行うことで実現できます。AWSガイドはこちらにあります。 DynamoDB JSON形式で「フルエクスポート」を行う必要があります。

3. スナップショットをClickHouseにロードする
必要なテーブルの作成
DynamoDBからのスナップショットデータは次のようになります:
データがネストされた形式になっていることに注意してください。このデータをClickHouseにロードする前にフラット化する必要があります。これは、Materialized View内でClickHouseのJSONExtract
関数を使用することで行えます。
次の三つのテーブルを作成します:
- DynamoDBからの生データを格納するテーブル
- 最終的なフラット化されたデータを格納するテーブル(宛先テーブル)
- データをフラット化するためのMaterialized View
上記のDynamoDBデータの例に対して、ClickHouseのテーブルは次のようになります:
宛先テーブルにはいくつかの要件があります:
- このテーブルは
ReplacingMergeTree
テーブルでなければなりません - テーブルには
version
カラムが必要です- 後のステップでは、Kinesisストリームからの
ApproximateCreationDateTime
フィールドをversion
カラムにマッピングします。
- 後のステップでは、Kinesisストリームからの
- テーブルはソートキーとしてパーティションキーを使用する必要があります(
ORDER BY
で指定されます)- 同じソートキーを持つ行は、
version
カラムに基づいて重複が排除されます。
- 同じソートキーを持つ行は、
スナップショットClickPipeの作成
次に、S3からClickHouseにスナップショットデータをロードするためのClickPipeを作成できます。S3 ClickPipeガイドに従ってこちらを参照してくださいが、次の設定を使用してください:
- 取り込みパス: エクスポートされたJSONファイルのS3内のパスを特定する必要があります。パスは次のようになります:
- フォーマット: JSONEachRow
- テーブル: あなたのスナップショットテーブル(上記の例では
default.snapshot
)
作成されると、データはスナップショットと宛先テーブルにポピュレートされ始めます。次のステップに進む前にスナップショットのロードが完了するのを待つ必要はありません。
4. Kinesis ClickPipeの作成
次に、Kinesisストリームからのリアルタイムの変更をキャプチャするためのKinesis ClickPipeを設定できます。Kinesis ClickPipeガイドに従ってこちらを参照してくださいが、次の設定を使用してください:
- ストリーム: ステップ1で使用されるKinesisストリーム
- テーブル: あなたの宛先テーブル(上記の例では
default.destination
) - オブジェクトをフラット化: true
- カラムマッピング:
ApproximateCreationDateTime
:version
- その他のフィールドを次のように宛先カラムにマッピングします

5. クリーンアップ(オプション)
スナップショットClickPipeが完了したら、スナップショットテーブルとMaterialized Viewを削除できます。