DynamoDBからClickHouseへのCDC
このページでは、ClickPipesを使用してDynamoDBからClickHouseへのCDCを設定する方法について説明します。この統合には2つのコンポーネントがあります:
- S3 ClickPipesを介した初期スナップショット
- Kinesis ClickPipesを介したリアルタイム更新
データは ReplacingMergeTree
に取り込まれます。このテーブルエンジンは、更新操作が適用されるCDCシナリオで一般的に使用されます。このパターンに関する詳細は、以下のブログ記事を参照してください:
1. Kinesisストリームの設定
まず、DynamoDBテーブルでリアルタイムの変更をキャプチャするためにKinesisストリームを有効にします。スナップショットを作成する前にこれを行うことで、データを見逃すことを避けることができます。 AWSガイドはこちらにあります。

2. スナップショットを作成する
次に、DynamoDBテーブルのスナップショットを作成します。これはAWSエクスポートを使用してS3に達成できます。AWSガイドはこちらにあります。 DynamoDB JSON形式で「フルエクスポート」を行う必要があります。

3. スナップショットをClickHouseにロードする
必要なテーブルを作成する
DynamoDBからのスナップショットデータは次のようになります:
データがネストされた形式であることに注意してください。このデータをClickHouseにロードする前にフラット化する必要があります。これは、ClickHouseのマテリアライズドビューのJSONExtract
関数を使用して行うことができます。
3つのテーブルを作成したいと思います:
- DynamoDBからの生データを保存するテーブル
- 最終的なフラット化されたデータを保存するテーブル(宛先テーブル)
- データをフラット化するためのマテリアライズドビュー
上記のDynamoDBデータの例に対して、ClickHouseのテーブルは次のようになります:
宛先テーブルにはいくつかの要件があります:
- このテーブルは
ReplacingMergeTree
テーブルである必要があります - テーブルには
version
カラムが必要です- 後のステップでは、Kinesisストリームの
ApproximateCreationDateTime
フィールドをversion
カラムにマッピングします。
- 後のステップでは、Kinesisストリームの
- テーブルは、ソートキーとしてパーティションキーを使用する必要があります(
ORDER BY
で指定)- 同じソートキーを持つ行は
version
カラムに基づいて重複排除されます。
- 同じソートキーを持つ行は
スナップショットClickPipeを作成する
S3からClickHouseにスナップショットデータをロードするためのClickPipeを作成できます。S3 ClickPipeガイドはこちらに従ってくださいが、次の設定を使用してください:
- インジェストパス:エクスポートされたjsonファイルのS3内のパスを特定する必要があります。パスは次のようになります:
- フォーマット:JSONEachRow
- テーブル:スナップショットテーブル(例:上記の
default.snapshot
)
作成が完了すると、スナップショットテーブルと宛先テーブルにデータが含まれ始めます。次のステップに進む前にスナップショットのロードが完了するまで待つ必要はありません。
4. Kinesis ClickPipeを作成する
次に、Kinesisストリームからのリアルタイムの変更をキャプチャするためのKinesis ClickPipeを設定できます。Kinesis ClickPipeガイドはこちらに従ってくださいが、次の設定を使用してください:
- ストリーム:ステップ1で使用されたKinesisストリーム
- テーブル:宛先テーブル(例:上記の
default.destination
) - オブジェクトのフラット化:true
- カラムマッピング:
ApproximateCreationDateTime
:version
- その他のフィールドを適切な宛先カラムにマッピングする(下記のように)

5. クリーンアップ(オプション)
スナップショットClickPipeが完了したら、スナップショットテーブルとマテリアライズドビューを削除できます。