メインコンテンツまでスキップ
メインコンテンツまでスキップ

dbt と ClickHouse の統合

dbt (data build tool) は、分析エンジニアがシンプルに SELECT ステートメントを記述することで、データウェアハウス内のデータを変換することを可能にします。 dbt は、これらの SELECT ステートメントをテーブルやビューの形でデータベース内のオブジェクトに物理化(マテリアライズ)することを処理し、抽出・移動・変換(ELT) の T を実行します。ユーザーは SELECT ステートメントで定義されたモデルを作成できます。

dbt 内では、これらのモデルを相互参照し、レイヤー化することで、より高レベルの概念を構築できるようになります。モデルを接続するために必要なボイラープレート SQL は自動的に生成されます。さらに、dbt はモデル間の依存関係を特定し、それらを適切な順序で作成されるように、直接非循環グラフ(DAG)を使用します。

dbt は、ClickHouse に対応したプラグイン を介して ClickHouse と互換性があります。私たちは、公開された IMDB データセットに基づく簡単な例を用いて、ClickHouse との接続プロセスを説明します。また、現在のコネクタのいくつかの制限事項も強調します。

概念

dbt はモデルの概念を導入します。これは、複数のテーブルを結合する可能性のある SQL ステートメントとして定義されます。モデルは、いくつかの方法で「マテリアライズ」することができます。マテリアライズは、モデルの SELECT クエリのためのビルド戦略を表します。マテリアライズの背後にあるコードは、SELECT クエリをラップするボイラープレート SQL です。これにより、新しいリレーションを作成したり、既存のリレーションを更新したりします。

dbt は 4 種類のマテリアライズを提供しています:

  • view (デフォルト): モデルがデータベース内のビューとして構築されます。
  • table: モデルがデータベース内のテーブルとして構築されます。
  • ephemeral: モデルはデータベース内で直接構築されませんが、共通テーブル式として依存するモデルに引き込まれます。
  • incremental: モデルは最初にテーブルとしてマテリアライズされ、その後の実行で新しい行が挿入され、変更された行がテーブル内で更新されます。

追加の構文および句は、基になるデータが変更された場合に、これらのモデルをどのように更新すべきかを定義します。dbt は一般的に、パフォーマンスが懸念事項になるまでビューのマテリアライズから始めることを推奨します。テーブル マテリアライズは、ストレージが増加する代わりに、モデルのクエリ結果をテーブルとしてキャプチャすることでクエリ時間のパフォーマンス向上を提供します。インクリメンタルアプローチは、これをさらに改善し、基になるデータのその後の更新をターゲットテーブルにキャプチャできるようにします。

現在のプラグイン は、ClickHouse で view, table, ephemeral, incremental マテリアライズをサポートしています。このガイドで探る dbt の スナップショットシード もサポートしています。

以下のガイドを通して、ClickHouse インスタンスが利用可能であることを前提とします。

dbt と ClickHouse プラグインのセットアップ

dbt

以下の例では dbt CLI の使用を前提としています。ユーザーは、プロジェクトを編集および実行するための Web ベースの統合開発環境(IDE)を提供する dbt Cloud も検討したいかもしれません。

dbt には CLI インストールのためのいくつかの選択肢があります。こちら に記載されている手順に従ってください。この段階では、dbt-core のみをインストールします。pip の使用をお勧めします。

重要: 以下は python 3.9 でテストされています。

ClickHouse プラグイン

dbt ClickHouse プラグインをインストールします:

ClickHouse の準備

dbt は高度に関係性のあるデータのモデリングに優れています。例の目的のために、次の関係スキーマを持つ小さな IMDB データセットを提供します。このデータセットは 関係データセットリポジトリ から取得したものです。これは dbt で一般的に使用されるスキーマと比べると簡素ですが、管理可能なサンプルを表しています:

IMDB テーブルスキーマ

以下に示すテーブルのサブセットを使用します。

次のテーブルを作成します:

注記

テーブル roles のカラム created_at はデフォルトで now() の値を持っています。これは後でモデルのインクリメンタル更新を確認するために使用します - インクリメンタルモデル を参照してください。

s3 関数を使用して、公開エンドポイントからソースデータを読み込み、データを挿入します。次のコマンドを実行してテーブルにデータを入力します:

これらの実行には帯域幅に応じて若干の違いがあるかもしれませんが、各コマンドは数秒で完了するはずです。以下のクエリを実行して各俳優のサマリーを計算し、最も多くの映画に出演している順に並べ、データが正常に読み込まれたことを確認します:

応答は次のようになります:

後のガイドでは、このクエリをモデルに変換し、dbt のビューおよびテーブルとして ClickHouse にマテリアライズします。

ClickHouse への接続

  1. dbt プロジェクトを作成します。このケースでは、imdb ソースに基づいた名前を付けます。プロンプトが表示されたら、データベースソースとして clickhouse を選択します。

  2. cd コマンドを使ってプロジェクトフォルダへ移動します:

  3. この時点で、お好みのテキストエディタが必要になります。以下の例では、人気のある VS Code を使用します。IMDB ディレクトリを開くと、yml ファイルや sql ファイルのコレクションが表示されます:

    新しい dbt プロジェクト
  4. dbt_project.yml ファイルを更新し、最初のモデル actor_summary を指定し、プロファイルを clickhouse_imdb に設定します。

    dbt プロファイル dbt プロファイル
  5. 次に、dbt に ClickHouse インスタンスへの接続情報を提供する必要があります。~/.dbt/profiles.yml に以下を追加します。

    ユーザーとパスワードを変更する必要があることに注意してください。利用できる他の設定については、こちらで文書化されています。

  6. IMDB ディレクトリから dbt debug コマンドを実行して、dbt が ClickHouse に接続できるかどうかを確認します。

    応答に Connection test: [OK connection ok] が含まれていることを確認し、接続が成功したことを示します。

シンプルなビューのマテリアライズを作成する

ビューのマテリアライズを使用する場合、モデルは毎回 CREATE VIEW AS ステートメントを介してビューとして再構築されます。これにより、データの追加ストレージは不要ですが、テーブルマテリアライズよりクエリが遅くなります。

  1. imdb フォルダから、models/example ディレクトリを削除します:

  2. models フォルダ内の actors に新しいファイルを作成します。ここでは、各俳優モデルを表すファイルを作成します:

  3. models/actors フォルダ内に schema.ymlactor_summary.sql のファイルを作成します。

    schema.yml ファイルは、私たちのテーブルを定義します。これらはマクロで使用できるようになります。 models/actors/schema.yml を編集して、次の内容を持つようにします:

    actors_summary.sql ファイルは実際のモデルを定義します。なお、config 関数ではモデルを ClickHouse にビューとしてマテリアライズするように求めています。テーブルは schema.yml ファイルから source 関数を介して参照されます。例: source('imdb', 'movies')imdb データベース内の movies テーブルを参照します。 models/actors/actors_summary.sql を編集して次の内容を持つようにします:

    最終的な actor_summary では updated_at カラムを含めることに注意してください。これは後でインクリメンタルマテリアライズ用に使用します。

  4. imdb ディレクトリから、dbt run コマンドを実行します。

  5. dbt は要求通りにモデルを ClickHouse のビューとして表現します。これで、このビューを直接クエリできます。このビューは ~/.dbt/profiles.ymlclickhouse_imdb プロファイルの下で、スキーマパラメータによって決定される imdb_dbt データベースに作成されています。

    このビューをクエリして、以前のクエリの結果を簡単な構文で再現できます:

テーブルマテリアライズを作成する

前の例では、モデルはビューとしてマテリアライズされました。これが一部のクエリに十分なパフォーマンスを提供するかもしれませんが、より複雑な SELECT や頻繁に実行されるクエリについては、テーブルとしてマテリアライズする方が良い場合があります。このマテリアライズは、BI ツールによってクエリされるモデルに対して有用で、ユーザーがより迅速な体験を得られることを保証します。これは、新しいテーブルとしてクエリ結果が保存されることを意味し、関連するストレージオーバーヘッドが発生します - 事実上、INSERT TO SELECT が実行されます。このテーブルは毎回再構築されるため、インクリメンタルではありません。大きな結果セットは長い実行時間を引き起こす可能性があるため、dbtの制限 を参照してください。

  1. actors_summary.sql ファイルを修正し、materialized パラメータを table に設定します。ORDER BY の定義に注意し、MergeTree テーブルエンジンを使用することを確認してください:

  2. imdb ディレクトリから dbt run コマンドを実行します。この実行には、長めの時間がかかる場合があります - 多くのマシンで約 10 秒です。

  3. テーブル imdb_dbt.actor_summary の作成を確認します:

    適切なデータ型のテーブルが表示されるはずです:

  4. このテーブルからの結果が以前の応答と一致していることを確認します。モデルがテーブルになったため、応答時間が改善されていることに気付くでしょう:

    このモデルに対して他のクエリを発行してみてください。例えば、5回以上出演している俳優のうち、映画の評点が最も高いのはどれでしょうか?

インクリメンタルマテリアライゼーションの作成

前の例では、モデルをマテリアライズするためのテーブルを作成しました。このテーブルは、各 dbt 実行ごとに再構築されます。これは、大規模な結果セットや複雑な変換に対して実行不可能で、非常にコストがかかる可能性があります。この課題に対処し、ビルド時間を短縮するために、dbt はインクリメンタルマテリアライゼーションを提供しています。これにより、dbt は前回の実行以来、テーブルにレコードを挿入または更新することができ、イベントスタイルのデータに適しています。内部では、すべての更新されたレコードを持つ一時テーブルが作成され、その後、すべての変更されていないレコードと更新されたレコードが新しいターゲットテーブルに挿入されます。これにより、テーブルモデルと同様の制限が大規模な結果セットに対して生じます。

これらの制限を大規模セットに対して克服するために、プラグインは「inserts_only」モードをサポートしており、ここでは一時テーブルが作成されることなく、すべての更新がターゲットテーブルに挿入されます(詳細は以下に記載)。

この例を説明するために、390 本の映画に出演する素晴らしい俳優「Clicky McClickHouse」を追加します - 彼は Mel Blanc よりも多くの映画に出演したことを保証します。

  1. まず、モデルのタイプをインクリメンタルに変更します。この追加には次のものが必要です。

    1. unique_key - プラグインが行を一意に識別できるようにするためには、unique_key を提供する必要があります。この場合、クエリの id フィールドで十分です。これにより、マテリアライズされたテーブルに行の重複がないことが保証されます。一意性制約に関する詳細は、こちら を参照してください。
    2. インクリメンタルフィルター - dbt に、インクリメンタル実行時にどの行が変更されたかを識別する方法を指示する必要があります。これは、デルタ式を提供することで達成されます。通常は、イベントデータのタイムスタンプが含まれます。このため、私たちの updated_at タイムスタンプフィールドが必要です。このカラムは、行が挿入されたときのデフォルト値が now() であり、新しい役割を特定できるようにします。さらに、新しい俳優が追加される代替ケースを識別する必要があります。{{this}} 変数を使用して、既存のマテリアライズテーブルを示すと、これは表現 where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}}) を持ちます。これを {% if is_incremental() %} 条件内部に埋め込み、インクリメンタル実行時にのみ使用されるようにし、テーブルが最初に構築されたときには使用されないようにします。インクリメンタルモデルの行をフィルタリングする詳細については、dbt ドキュメントのこの議論を参照してください。

    ファイル actor_summary.sql を次のように更新します:

    我々のモデルは rolesactors テーブルへの更新と追加にのみ応答します。すべてのテーブルに応答するためには、ユーザーはこのモデルを複数のサブモデルに分割することを推奨します - 各サブモデルにはそれぞれのインクリメンタル基準があります。これらのモデルは、さらに相互参照され、接続されることができます。モデルの相互参照に関する詳細は、こちらを参照してください。

  2. dbt run を実行し、結果のテーブルの結果を確認します:

  3. 次に、インクリメンタルアップデートを示すために、モデルにデータを追加します。俳優「Clicky McClickHouse」を actors テーブルに追加します:

  4. 「Clicky」を 910 本のランダムな映画に出演させましょう:

  5. 彼が確かに最も多く出演している俳優であることを確認するために、基盤となるソーステーブルをクエリし、dbt モデルをバイパスします:

  6. dbt run を実行し、私たちのモデルが更新され、上記の結果と一致することを確認します:

内部

上記のインクリメンタルアップデートを実現するために実行されたステートメントは、ClickHouse のクエリログをクエリすることで識別できます。

上記のクエリを実行期間に調整してください。結果の検査はユーザーに任せますが、インクリメンタルアップデートを行うためにプラグインが使用した一般的な戦略を強調します:

  1. プラグインは一時テーブル actor_sumary__dbt_tmp を作成します。変更された行はこのテーブルにストリーミングされます。
  2. 新しいテーブル actor_summary_new が作成されます。古いテーブルからの行は、行 id が一時テーブルに存在しないかどうかのチェックを行いながらストリーミングされます。これにより、更新と重複が適切に処理されます。
  3. 一時テーブルからの結果が新しい actor_summary テーブルにストリーミングされます。
  4. 最後に、新しいテーブルが古いバージョンと原子的に交換されます。古いテーブルと一時テーブルは次に削除されます。

以下にこのプロセスを視覚化しています:

incremental updates dbt

この戦略は非常に大規模なモデルでは課題に直面する可能性があります。詳細については制限事項を参照してください。

追加戦略(inserts-only モード)

インクリメンタルモデルの大規模データセットの制限を克服するために、プラグインは dbt 設定パラメータ incremental_strategy を使用します。これを append 値に設定できます。設定すると、更新された行がターゲットテーブル(すなわち imdb_dbt.actor_summary)に直接挿入され、一時テーブルは作成されません。

注意: 追加のみモードでは、データが不変であるか、重複しても問題ない必要があります。変更された行をサポートするインクリメンタルテーブルモデルが必要な場合は、このモードを使用しないでください!

このモードを示すために、もう一人の新しい俳優を追加し、incremental_strategy='append' で dbt run を再実行します。

  1. actor_summary.sql で追加のみモードを構成します:

  2. もう一人の有名な俳優 - ダニー・デヴィートを追加しましょう。

  3. ダニーを 920 本のランダムな映画に出演させましょう。

  4. dbt run を実行し、ダニーが actor-summary テーブルに追加されたことを確認します。

「Clicky」の挿入と比較して、インクリメンタルの速度がいかに速いかに注目してください。

再度クエリログテーブルを確認すると、2 回のインクリメンタル実行の違いが明らかになります:

この実行では、新しい行のみが imdb_dbt.actor_summary テーブルに直接追加され、一時テーブルの作成はありません。

削除+挿入モード(実験的)

歴史的に、ClickHouseは、非同期の変更の形での更新と削除に対する限られたサポートを持っていました。これらは非常に I/O 集約型で、一般的には避けるべきです。

ClickHouse 22.8 は 軽量削除 を導入しました。これらは現在実験的ですが、データを削除するためのより高性能な手段を提供します。

このモードは、incremental_strategy パラメータを介してモデルに構成できます。すなわち、

この戦略はターゲットモデルのテーブルに直接作用するため、操作中に問題が発生すると、インクリメンタルモデルのデータが無効な状態になる可能性があります - 原子的な更新はありません。

要約すると、このアプローチは次の通りです:

  1. プラグインは一時テーブル actor_sumary__dbt_tmp を作成します。変更された行はこのテーブルにストリーミングされます。
  2. 現在の actor_summary テーブルに対して DELETE が発行されます。行は actor_sumary__dbt_tmp から id によって削除されます。
  3. actor_sumary__dbt_tmp から actor_summary に行が挿入されます。

このプロセスは、以下のように示されます:

lightweight delete incremental

挿入上書きモード(実験的)

次の手順を実行します:

  1. インクリメンタルモデル関係と同じ構造を持つステージング(仮)テーブルを作成します: CREATE TABLE {staging} AS {target}
  2. ステージングテーブルに新しいレコードのみを挿入します(SELECT によって生成)。
  3. ターゲットテーブルにステージングテーブルに存在する新しいパーティションのみを置き換えます。

このアプローチには次の利点があります:

  • テーブル全体をコピーしないため、デフォルトの戦略よりも高速です。
  • INSERT 操作が成功裏に完了するまで元のテーブルを変更しないため、他の戦略よりも安全です: 中間的な失敗があった場合、元のテーブルは変更されません。
  • 「パーティションの不変性」を実装し、データエンジニアリングのベストプラクティスを従います。これにより、インクリメンタルで並行処理、ロールバックなどが簡素化されます。
insert overwrite incremental

スナップショットの作成

dbt スナップショットは、時間の経過とともに可変モデルの変更を記録することを可能にします。これにより、アナリストがモデルの以前の状態を「振り返る」ことができるポイントインタイムクエリが可能になります。これは、行が有効であった時期を記録するfrom および to 日付のカラムを使用するタイプ 2 ゆっくり進化する次元を使用することで実現されます。この機能は ClickHouse プラグインによってサポートされ、以下に示されます。

この例では、インクリメンタルテーブルモデルの作成を完了していることを前提とします。actor_summary.sql で insert_only=True に設定しないようにしてください。あなたの models/actor_summary.sql は次のようになります:

  1. スナップショットディレクトリに actor_summary というファイルを作成します。

  2. actor_summary.sql ファイルの内容を次のように更新します:

この内容に関するいくつかの観察:

  • SELECT クエリは、時間の経過とともにスナップショットを取得したい結果を定義します。ref 関数は、以前に作成した actor_summary モデルを参照するために使用されます。
  • レコードの変更を示すためのタイムスタンプ列が必要です。私たちの updated_at 列(インクリメンタルテーブルモデルの作成を参照)をここで使用できます。パラメータ strategy は、更新を示すためにタイムスタンプを使用することを示し、パラメータ updated_at は使用する列を指定します。これがモデルに存在しない場合は、代わりに チェック戦略 を使用できます。これは非常に非効率で、ユーザーが比較する列のリストを指定する必要があります。dbt はこれらの列の現在の値と履歴値を比較し、変更を記録します(または同じであれば何もしません)。
  1. dbt snapshot コマンドを実行します。

注意してください、スナップショットデータベースに actor_summary_snapshot テーブルが作成されました(target_schema パラメータによって決定されます)。

  1. このデータをサンプリングすると、dbt が列 dbt_valid_from および dbt_valid_to を含めていることがわかります。後者は null に設定されています。次回の実行でこれが更新されます。

  2. 我々の好きな俳優 Clicky McClickHouse をさらに 10 本の映画に出演させます。

  3. imdb ディレクトリから dbt run コマンドを再実行します。これによりインクリメンタルモデルが更新されます。この処理が完了したら、dbt snapshot を実行して変更をキャプチャします。

  4. もしスナップショットをクエリすると、Clicky McClickHouse に対して 2 行があることに注意してください。以前のエントリには dbt_valid_to 値があり、最新の値は同じ値を dbt_valid_from 列に持ち、dbt_valid_to 値は null です。もし新しい行があれば、これらもスナップショットに追加されます。

dbt スナップショットに関する詳しい情報はこちらをご覧ください。

Seeds の使用

dbt は CSV ファイルからデータをロードする機能を提供します。この機能は、大規模なデータベースのエクスポートをロードするためには適しておらず、通常はコードテーブルや 辞書 に使用される小さなファイル向けに設計されています。例えば、国コードを国名にマッピングする場合です。簡単な例として、シード機能を使用してジャンルコードのリストを生成し、アップロードします。

  1. 既存のデータセットからジャンルコードのリストを生成します。dbt ディレクトリから clickhouse-client を使用して、seeds/genre_codes.csv というファイルを作成します。

  2. dbt seed コマンドを実行します。これにより、CSV ファイルの行を持つ新しいテーブル genre_codes が、当社のデータベース imdb_dbt に作成されます(スキーマ構成で定義された通りです)。

  3. これらがロードされたことを確認します:

制限事項

現在の ClickHouse プラグインには、ユーザーが認識すべきいくつかの制限があります:

  1. プラグインは現在、モデルを INSERT TO SELECT を使用してテーブルとして物質化します。これは実質的にデータの重複を意味します。非常に大きなデータセット(PB)は、非常に長い実行時間を引き起こす可能性があり、一部のモデルが実行不可能になる場合があります。可能な限り GROUP BY を活用して、クエリによって返される行の数を最小限に抑えることを目指してください。行数を維持しながら変換を行うモデルよりも、データを要約するモデルを優先してください。
  2. ディストリビュートテーブルを使用してモデルを表現するには、ユーザーは各ノードの基礎となるレプリケートテーブルを手動で作成する必要があります。その上にディストリビュートテーブルを作成できます。プラグインはクラスターの作成を管理しません。
  3. dbt がデータベース内にリレーション(テーブル/ビュー)を作成する際、通常は {{ database }}.{{ schema }}.{{ table/view id }} という形式で作成します。ClickHouse にはスキーマの概念がありません。したがって、プラグインは {{schema}}.{{ table/view id }} を使用し、ここで schema は ClickHouse のデータベースです。

さらなる情報

以前のガイドは dbt 機能の表面に触れるだけでした。ユーザーには優れた dbt ドキュメント を読むことをお勧めします。

プラグインの追加設定については、こちら を参照してください。

Fivetran

dbt-clickhouse コネクタは、Fivetran transformations でも使用可能で、dbt を使用して Fivetran プラットフォーム内でシームレスな統合と変換機能を提供します。