メインコンテンツへスキップ
メインコンテンツへスキップ

DynamoDB から ClickHouse への CDC

このページでは、ClickPipes を使用して DynamoDB から ClickHouse への CDC を設定する方法を説明します。この連携は、次の 2 つのコンポーネントから構成されます。

  1. S3 ClickPipes による初期スナップショット
  2. Kinesis ClickPipes によるリアルタイム更新

データは ReplacingMergeTree に取り込まれます。このテーブルエンジンは、更新操作を反映できるようにするために、CDC シナリオで一般的に使用されます。このパターンの詳細は、次のブログ記事で確認できます。

1. Kinesis ストリームをセットアップする

まず、DynamoDB テーブルで Kinesis ストリームを有効にして、変更をリアルタイムで取り込めるようにします。スナップショットを作成する前にこの設定を行うことで、データの取りこぼしを防ぎます。 AWS のガイドはこちらを参照してください。

DynamoDB Kinesis Stream

2. スナップショットを作成する

次に、DynamoDB テーブルのスナップショットを作成します。これは、AWS から S3 へのエクスポートで実行できます。AWS のガイドはこちらを参照してください。 DynamoDB JSON 形式で「Full export(フルエクスポート)」を実行してください。

DynamoDB S3 Export

3. スナップショットを ClickHouse に読み込む

必要なテーブルを作成する

DynamoDB からのスナップショットデータは次のような形式になります:

{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}

データがネストされた形式になっていることがわかります。このデータを ClickHouse にロードする前にフラット化する必要があります。これは、ClickHouse の JSONExtract 関数を materialized view 内で使用することで実現できます。

ここでは次の 3 つのテーブルを作成します。

  1. DynamoDB からの生データを保存するテーブル
  2. フラット化後の最終データを保存するテーブル(宛先テーブル)
  3. データをフラット化するための materialized view

上記の DynamoDB データの例では、ClickHouse のテーブルは次のようになります。

/* Snapshot table */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Table for final flattened data */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Table for final flattened data */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;

宛先テーブルには、いくつかの要件を満たす必要があります。

  • このテーブルは ReplacingMergeTree テーブルである必要があります
  • テーブルには version カラムが必要です
    • 後続の手順で、Kinesis ストリームの ApproximateCreationDateTime フィールドを version カラムにマッピングします。
  • テーブルは、パーティションキーをソートキー(ORDER BY で指定)として使用する必要があります
    • 同じソートキーを持つ行は、version カラムに基づいて重複排除されます。

スナップショット用 ClickPipe を作成する

これで、S3 から ClickHouse へスナップショットデータをロードするための ClickPipe を作成できます。S3 ClickPipe ガイドはこちらを参照し、以下の設定を使用してください。

  • Ingest path: S3 にエクスポートされた JSON ファイルのパスを特定する必要があります。パスは次のような形式になります。
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • Format: JSONEachRow
  • Table: スナップショット用のテーブル(例: 上記の例では default.snapshot

作成が完了すると、スナップショットテーブルと宛先テーブルへのデータ投入が始まります。次のステップに進む前に、スナップショットのロード完了を待つ必要はありません。

4. Kinesis ClickPipe を作成する

ここでは、Kinesis ストリームからのリアルタイムでの変更をキャプチャするための Kinesis ClickPipe をセットアップします。Kinesis ClickPipe ガイドはこちらに従いますが、次の設定を使用してください。

  • Stream: ステップ 1 で使用した Kinesis ストリーム
  • Table: 宛先テーブル(例: 上記の例では default.destination
  • Flatten object: true
  • Column mappings:
    • ApproximateCreationDateTime: version
    • 他のフィールドは、以下に示すように適切な宛先カラムにマッピングします
DynamoDB カラムのマッピング

5. クリーンアップ(任意)

スナップショット ClickPipe の処理が完了したら、スナップショットテーブルと materialized view を削除して構いません。

DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";