メインコンテンツまでスキップ
メインコンテンツまでスキップ

CDC from DynamoDB to ClickHouse

Experimental feature. Learn more.

このページでは、ClickPipesを使用してDynamoDBからClickHouseへのCDCを設定する方法について説明します。この統合には2つのコンポーネントがあります:

  1. S3 ClickPipesを介した初期スナップショット
  2. Kinesis ClickPipesを介したリアルタイムの更新

データはReplacingMergeTreeに取り込まれます。このテーブルエンジンはCDCシナリオによく使用され、更新操作を適用できるようにします。このパターンに関する詳細は、以下のブログ記事で見つかります:

1. Kinesis Streamの設定

最初に、DynamoDBテーブルでKinesisストリームを有効にして、リアルタイムで変更をキャプチャします。スナップショットを作成する前にこれを行い、データの喪失を避ける必要があります。 AWSガイドはこちらにあります。

DynamoDB Kinesis Stream

2. スナップショットの作成

次に、DynamoDBテーブルのスナップショットを作成します。これは、AWSからS3へのエクスポートを通じて実行できます。AWSガイドはこちらにあります。 DynamoDB JSONフォーマットで「フルエクスポート」を実行する必要があります。

DynamoDB S3 Export

3. スナップショットをClickHouseにロード

必要なテーブルの作成

DynamoDBからのスナップショットデータは次のようになります:

{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}

データがネストされた形式になっていることに注意してください。ClickHouseにロードする前に、このデータをフラット化する必要があります。これは、ClickHouseのMaterialized ViewでJSONExtract関数を使用して行えます。

次の3つのテーブルを作成する必要があります:

  1. DynamoDBからの生データを格納するテーブル
  2. 最終的にフラット化されたデータを格納するテーブル(宛先テーブル)
  3. データをフラット化するためのMaterialized View

上記の例のDynamoDBデータに対するClickHouseのテーブルは次のようになります:

/* スナップショットテーブル */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* 最終的にフラット化されたデータ用のテーブル */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* 最終的にフラット化されたデータ用のテーブル */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;

宛先テーブルにはいくつかの要件があります:

  • このテーブルはReplacingMergeTreeテーブルである必要があります
  • テーブルにはversionカラムが必要です
    • 後のステップで、KinesisストリームからApproximateCreationDateTimeフィールドをversionカラムにマッピングします。
  • テーブルはソートキーとしてパーティションキーを使用する必要があります(ORDER BYで指定)
    • 同じソートキーを持つ行はversionカラムに基づいて重複排除されます。

スナップショットClickPipeの作成

次に、S3からClickHouseにスナップショットデータをロードするためのClickPipeを作成できます。S3 ClickPipeガイドに従ってこちらを参照してくださいが、次の設定を使用します:

  • インジェストパス:エクスポートされたjsonファイルのS3内のパスを特定する必要があります。パスは次のようになります:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • フォーマット:JSONEachRow
  • テーブル:スナップショットテーブル(例:上記のdefault.snapshot

作成が完了すると、スナップショットテーブルと宛先テーブルにデータがポピュレートし始めます。次のステップに進む前にスナップショットのロードが完了するのを待つ必要はありません。

4. Kinesis ClickPipeの作成

次に、Kinesisストリームからリアルタイムで変更をキャプチャするためのKinesis ClickPipeを設定できます。Kinesis ClickPipeガイドに従ってこちらを参照してくださいが、次の設定を使用します:

  • ストリーム:ステップ1で使用したKinesisストリーム
  • テーブル:宛先テーブル(例:上記のdefault.destination
  • オブジェクトのフラット化:true
  • カラムマッピング
    • ApproximateCreationDateTime: version
    • 他のフィールドを、以下に示す宛先カラムに適切にマッピングします
DynamoDB Map Columns

5. クリーンアップ(オプション)

スナップショットClickPipeが完了したら、スナップショットテーブルとマテリアライズドビューを削除できます。

DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";