メインコンテンツまでスキップ
メインコンテンツまでスキップ

dbtとClickHouseの統合

ClickHouse Supported

dbt (data build tool) は、分析エンジニアがデータウェアハウス内のデータを単純にSELECT文を記述することで変換できるようにします。dbtはこれらのSELECT文をデータベース内のテーブルやビューの形でオブジェクトにマテリアライズする作業を行い、Extract Load and Transform (ELT)のTを実行します。ユーザーはSELECT文で定義されたモデルを作成できます。

dbt内では、これらのモデルは相互参照され、層状に構成されることで、より高レベルな概念を構築できるようになります。モデルを接続するために必要なボイラープレートSQLは自動的に生成されます。さらに、dbtはモデル間の依存関係を特定し、有向非循環グラフ(DAG)を使用してそれらが適切な順序で作成されることを保証します。

dbtはClickHouseサポートプラグインを通じてClickHouseと互換性があります。公開されているIMDBデータセットに基づく簡単な例を使用して、ClickHouseとの接続プロセスを説明します。また、現在のコネクタのいくつかの制限についても説明します。

概念

dbtはモデルの概念を導入します。これは、複数のテーブルを結合する可能性のあるSQL文として定義されます。モデルはさまざまな方法で「マテリアライズ」できます。マテリアライゼーションは、モデルのSELECTクエリのビルド戦略を表します。マテリアライゼーションの背後にあるコードは、SELECTクエリをラップするボイラープレートSQLです。これにより、新しいリレーションを作成または既存のリレーションを更新します。

dbtは4種類のマテリアライゼーションを提供します:

  • view (デフォルト): モデルはデータベース内のビューとして構築されます。
  • table: モデルはデータベース内のテーブルとして構築されます。
  • ephemeral: モデルはデータベース内に直接構築されず、依存するモデルに共通テーブル式として取り込まれます。
  • incremental: モデルは最初にテーブルとしてマテリアライズされ、次回の実行ではdbtが新しい行を挿入し、テーブル内の変更された行を更新します。

これらのモデルが基になるデータが変更された場合の更新方法を定義する追加の構文と句があります。dbtは、パフォーマンスが問題になるまでviewマテリアライゼーションから始めることを一般的に推奨します。tableマテリアライゼーションは、モデルのクエリの結果をテーブルとしてキャプチャすることにより、クエリのパフォーマンスを改善しますが、ストレージが増加するコストが伴います。incrementalアプローチは、この概念をさらに発展させて、基になるデータに対する後続の更新をターゲットテーブルにキャプチャできるようにします。

現在のプラグインは、viewtableephemeralおよびincrementalマテリアライゼーションをサポートしています。また、dbtのsnapshotsseedsもサポートしており、これらについてもこのガイドで探求します。

以下のガイドでは、ClickHouseのインスタンスが利用可能であることを前提とします。

dbtとClickHouseプラグインのセットアップ

dbt

以下の例ではdbt CLIの使用を前提としています。ユーザーはdbt Cloudも考慮するかもしれません。これにより、ユーザーはプロジェクトを編集したり実行するためのWebベースの統合開発環境(IDE)が提供されます。

dbtはCLIインストールのためのいくつかのオプションを提供します。以下のここに記載されている手順に従ってください。この段階では、dbt-coreのみをインストールします。pipの使用を推奨します。

重要: 以下の手順は、Python 3.9でテストされています。

ClickHouseプラグイン

dbt ClickHouseプラグインをインストールします:

ClickHouseの準備

dbtは高度にリレーショナルなデータのモデリングに優れています。例の目的のために、次のリレーショナルスキーマを持つ小さなIMDBデータセットを提供します。このデータセットはrelational dataset repositoryから派生しています。これはdbtで使用される一般的なスキーマと比較して簡単なものですが、管理可能なサンプルを表しています:

これらのテーブルのサブセットを使用します。次のテーブルを作成してください:

注記

テーブルrolesのカラムcreated_atは、デフォルトでnow()の値になります。後で、増分更新を特定するために使用します - Incremental Modelsを参照してください。

s3関数を使用して、公開エンドポイントからソースデータを読み取り、データを挿入します。以下のコマンドを実行してテーブルにデータを入力します:

これらの実行には帯域幅によって異なる場合がありますが、各実行は数秒で完了するはずです。次のクエリを実行して、各俳優の概要を計算し、最も映画に出演した順に並び替えて、データが正常にロードされていることを確認します:

応答は次のようになります:

後のガイドでは、このクエリをモデルに変換し、ClickHouseでdbtビューおよびテーブルとしてマテリアライズします。

ClickHouseへの接続

  1. dbtプロジェクトを作成します。この場合、ソースであるimdbにちなんで名前を付けます。プロンプトが表示されたら、データベースソースとしてclickhouseを選択します。

  2. プロジェクトフォルダにcdします:

  3. この時点で、好みのテキストエディタが必要です。以下の例では人気のあるVS Codeを使用します。IMDBディレクトリを開くと、一連のymlおよびsqlファイルが表示されるはずです:

  4. dbt_project.ymlファイルを更新して、最初のモデル- actor_summaryを指定し、プロファイルをclickhouse_imdbに設定します。

  5. 次に、dbtにClickHouseインスタンスの接続詳細を提供する必要があります。次の内容を~/.dbt/profiles.ymlに追加します。

    ユーザーとパスワードを修正する必要があることに注意してください。追加の設定はこちらに文書化されています。

  6. IMDBディレクトリからdbt debugコマンドを実行して、dbtがClickHouseに接続できるかどうかを確認します。

    応答にConnection test: [OK connection ok]が含まれていることを確認し、接続が成功したことを示します。

簡単なビューのマテリアライゼーションの作成

ビューのマテリアライゼーションを使用すると、モデルは毎回の実行でビューとして再構築されます。これはClickHouseでのCREATE VIEW AS文を通じて行われます。これにより、データの追加ストレージは必要ありませんが、テーブルマテリアライゼーションよりもクエリが遅くなります。

  1. imdbフォルダからmodels/exampleディレクトリを削除します:

  2. modelsフォルダ内のactorsに新しいファイルを作成します。ここでは、それぞれの俳優モデルを表すファイルを作成します:

  3. models/actorsフォルダにschema.ymlおよびactor_summary.sqlのファイルを作成します。

    ファイルschema.ymlは、私たちのテーブルを定義します。これらは後でマクロで使用可能になります。models/actors/schema.ymlを編集して次の内容を含めます:

    actors_summary.sqlは実際のモデルを定義します。設定関数の中で、モデルがClickHouseのビューとしてマテリアライズされるようリクエストも行います。我々のテーブルは、source関数を介してschema.ymlファイルから参照されます。例えば、source('imdb', 'movies')imdbデータベース内のmoviesテーブルを指します。models/actors/actors_summary.sqlを編集して次の内容を含めます:

    最終的なactor_summaryにupdated_atカラムを含めていることに注意してください。これは後で増分マテリアライゼーションに使用します。

  4. imdbディレクトリから、dbt runコマンドを実行します。

  5. dbtはリクエストされた通りにモデルをClickHouseのビューとして表示します。これで、このビューを直接クエリできます。このビューは~/.dbt/profiles.ymlclickhouse_imdbプロファイルのスキーマパラメータにより決定されるimdb_dbtデータベースに作成されます。

    このビューをクエリすると、以前のクエリの結果をより簡潔な構文で再現できます:

テーブルマテリアライゼーションの作成

前の例では、モデルはビューとしてマテリアライズされました。このため、特定のクエリに対して十分なパフォーマンスを提供することがありますが、より複雑なSELECTや頻繁に実行されるクエリはテーブルとしてマテリアライズする方が良い場合があります。このマテリアライゼーションは、BIツールによってクエリされるモデルに役立ち、ユーザーに迅速な体験を提供します。これにより、クエリの結果が新しいテーブルとして保存され、関連するストレージのオーバーヘッドが発生します - 実際にはINSERT TO SELECTが実行されます。このテーブルは毎回再構築されるため、増分ではないことに注意してください。大規模な結果セットは長い実行時間を引き起こす可能性があります - dbt Limitationsを参照してください。

  1. actors_summary.sqlファイルを修正して、materializedパラメータをtableに設定します。ORDER BYの定義方法と、MergeTreeテーブルエンジンを使用していることに注意してください:

  2. imdbディレクトリからdbt runコマンドを実行します。この実行は少し長くかかる可能性があります - ほとんどのマシンで約10秒です。

  3. テーブルimdb_dbt.actor_summaryの作成を確認します:

    適切なデータ型を持つテーブルが表示されるはずです:

  4. このテーブルの結果が以前の応答と一貫していることを確認してください。モデルがテーブルになったことで応答時間が顕著に改善されていることに注意してください:

    このモデルに対して他のクエリを実行してもかまいません。例えば、5回以上出演した俳優の中で最高評価の映画を持つのは誰でしょうか?

インクリメンタルマテリアライゼーションの作成

前の例ではモデルをマテリアライズするためのテーブルを作成しました。このテーブルは、各dbt実行のたびに再構築されます。これは、大規模な結果セットや複雑な変換には実現不可能で、非常に高コストになる可能性があります。この課題に対処し、ビルド時間を短縮するために、dbtはインクリメンタルマテリアライゼーションを提供しています。これにより、dbtは前回の実行以降にテーブルにレコードを挿入または更新できるため、イベントスタイルのデータに適しています。裏では、一時テーブルが作成され、すべての更新されたレコードが格納され、その後、触れられていないレコードと更新されたレコードが新しいターゲットテーブルに挿入されます。これにより、テーブルモデルと同様の制限が大規模な結果セットに対して発生します。

大規模なセットに対するこれらの制限を克服するために、プラグインは 'inserts_only' モードをサポートしており、すべての更新が一時テーブルを作成せずにターゲットテーブルに挿入されます(詳細は下記)。

この例を示すために、910本の映画に出演した「Clicky McClickHouse」という俳優を追加します。彼はメル・ブランクよりも多くの映画に出演することを保証します。

  1. まず、モデルのタイプをインクリメンタルに変更します。この追加には以下が必要です:

    1. unique_key - プラグインが行を一意に特定できるように、unique_keyを提供する必要があります。この場合、クエリからの id フィールドで十分です。これにより、マテリアライズされたテーブルに行の重複が発生しないことが保証されます。ユニーク制約の詳細については、こちらを参照してください。
    2. インクリメンタルフィルター - また、dbtがインクリメンタル実行でどの行が変更されたかを特定する方法を伝える必要があります。これは、デルタ式を提供することで達成されます。通常、これはイベントデータのタイムスタンプを含むため、更新日時として updated_at タイムスタンプフィールドを使用します。行が挿入される際のデフォルト値は now() であり、新しい行を特定できます。加えて、新しい俳優が追加される場合の代替ケースを特定する必要があります。{{this}} 変数を使用して、既存のマテリアライズテーブルを示すと、式 where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}}) が得られます。この式は {% if is_incremental() %} 条件内に埋め込み、インクリメンタル実行時のみ使用され、テーブルが最初に構築されるときには使用されないようにします。インクリメンタルモデルの行をフィルタリングする詳細については、dbtDocsのこの議論を参照してください。

    actor_summary.sql ファイルを以下のように更新します:

    私たちのモデルは、roles および actors テーブルへの更新と追加にのみ反応します。すべてのテーブルに反応するためには、ユーザーはこのモデルを複数のサブモデルに分割することを推奨します - 各サブモデルには独自のインクリメンタル基準があります。これらのモデルは、参照および接続されることができます。モデルの相互参照の詳細については、こちらを参照してください。

  2. dbt run を実行し、結果テーブルの結果を確認します:

  3. インクリメンタル更新を示すために、モデルにデータを追加します。「Clicky McClickHouse」を actors テーブルに追加します:

  4. 「Clicky」を910本のランダムな映画に出演させましょう:

  5. 実際に最も多くの出演を果たした俳優であることを確認するために、基礎となるソーステーブルをクエリして、dbtモデルをバイパスします:

  6. dbt run を実行し、モデルが更新されて上記の結果と一致することを確認します:

内部実装

上記のインクリメンタル更新を実現するために実行されたステートメントは、ClickHouseのクエリログをクエリすることで特定できます。

上記のクエリは、実行期間に応じて調整してください。結果の検査はユーザーに任せますが、インクリメンタル更新を実行するためにプラグインで使用される一般的な戦略を強調します:

  1. プラグインは一時テーブル actor_sumary__dbt_tmp を作成します。変更された行がこのテーブルにストリーミングされます。
  2. 新しいテーブル actor_summary_new が作成されます。古いテーブルの行は、新しいテーブルに新旧の行がストリーミングされ、行IDが一時テーブルに存在しないことを確認します。これにより、更新と重複を効率的に処理できます。
  3. 一時テーブルの結果が新しい actor_summary テーブルにストリーミングされます。
  4. 最後に、新しいテーブルは EXCHANGE TABLES ステートメントを介して古いバージョンと原子的に交換されます。古いテーブルと一時テーブルはそれぞれ削除されます。

以下のように図示できます:

この戦略は非常に大規模なモデルでは課題に直面する可能性があります。詳細については制限を参照してください。

アペンド戦略(inserts-only モード)

インクリメンタルモデルにおける大規模データセットの制限を克服するために、プラグインはdbt設定パラメータ incremental_strategy を使用します。これは値 append に設定できます。設定されると、更新された行がターゲットテーブル(すなわち imdb_dbt.actor_summary)に直接挿入され、一時テーブルは作成されません。 注意:アペンド専用モードではデータが不変であるか、重複が許可されている必要があります。更新された行をサポートするインクリメンタルテーブルモデルが必要な場合は、このモードを使用しないでください!

このモードを示すために、別の新しい俳優を追加し、 incremental_strategy='append' でdbt runを再実行します。

  1. actor_summary.sqlでアペンド専用モードを構成します:

  2. もう一人の有名な俳優 - ダニー・デヴィートを追加しましょう。

  3. ダニーを920本のランダムな映画に出演させましょう。

  4. dbt run を実行し、ダニーが俳優サマリーテーブルに追加されたことを確認します。

前の「Clicky」の挿入と比べて、インクリメンタルがどれほど速かったかに注意してください。

再度クエリログテーブルを確認すると、2回のインクリメンタル実行の違いが明らかになります:

この実行では、新しい行のみが直接 imdb_dbt.actor_summary テーブルに追加され、一時テーブルの作成は含まれません。

削除+挿入モード(実験的)

歴史的に、ClickHouseは非同期ミューテーションという形でのみ、更新および削除のサポートが限られています。これらは非常にI/O集約的であり、一般的には避けるべきです。

ClickHouse 22.8は軽量削除を導入しました。これらは現在実験的ですが、データを削除するためのよりパフォーマンスの良い手段を提供します。

このモードは、incremental_strategy パラメータを介してモデルに設定できます:

この戦略はターゲットモデルのテーブルで直接操作を行うため、操作中に問題が発生した場合、インクリメンタルモデルのデータは無効な状態になる可能性があります - 原子的な更新はありません。

要約すると、このアプローチは:

  1. プラグインは一時テーブル actor_sumary__dbt_tmp を作成します。変更された行がこのテーブルにストリーミングされます。
  2. 現在の actor_summary テーブルに対して DELETE が発行されます。 actor_sumary__dbt_tmp からIDで行が削除されます。
  3. actor_sumary__dbt_tmp から actor_summary に行が挿入されます。

このプロセスは以下のように示されます:

insert_overwrite モード(実験的)

次のステップを実行します:

  1. インクリメンタルモデル関係と同じ構造のステージング(仮想)テーブルを作成:CREATE TABLE {staging} AS {target}
  2. 新しいレコードのみ(SELECTによって生成された)をステージングテーブルに挿入。
  3. 以前の新しいパーティション(ステージングテーブルに存在する)をターゲットテーブルに置き換えます。

このアプローチには以下の利点があります:

  • 全テーブルをコピーする必要がないため、デフォルトの戦略よりも高速です。
  • INSERT操作が正常に完了するまで元のテーブルを変更しないため、他の戦略よりも安全です:中間的な失敗があった場合、元のテーブルは変更されません。
  • データ工学のベストプラクティスである「パーティションの不変性」を実装します。これにより、インクリメンタルかつ並行なデータ処理、ロールバックなどが簡易化されます。

スナップショットの作成

dbtスナップショットを使用すると、時間の経過に伴う可変モデルへの変更の記録を作成できます。これにより、分析者がモデルの以前の状態を「振り返る」ことができるポイントインタイムクエリが可能になります。これは、行が有効であった期間を記録するために、開始日および終了日の列を使用するタイプ2ゆっくり変化する次元を使用することにより実現されます。この機能はClickHouseプラグインによってサポートされており、以下に示します。

この例は、インクリメンタルテーブルモデルの作成を完了したと仮定しています。あなたの actor_summary.sqlinserts_only=True が設定されていないことを確認してください。あなたの models/actor_summary.sql は以下のように見える必要があります:

  1. スナップショットディレクトリに actor_summary ファイルを作成します。

  2. actor_summary.sql ファイルの内容を以下の内容で更新します:

この内容に関するいくつかの観察ポイント:

  • selectクエリは、時間の経過に伴ってスナップショットを取得したい結果を定義します。関数 ref は、以前に作成した actor_summary モデルを参照するために使用されます。
  • レコードの変更を示すために、タイムスタンプ列が必要です。ここでは、私たちの updated_at 列(インクリメンタルテーブルモデルの作成を参照)は、ここで使用されます。パラメータ strategy は、更新を示すためにタイムスタンプを使用することを示し、updated_at パラメータは使用する列を指定します。モデルにこれが存在しない場合、代わりにチェック戦略を使用できます。これは非常に非効率的で、ユーザーが比較する列のリストを指定する必要があります。dbtは、これらの列の現在および履歴の値を比較し、変更があれば記録し(同一である場合は何もしません)。
  1. コマンド dbt snapshot を実行します。

注意してください、snapshotsデータベースに actor_summary_snapshot テーブルが作成されたことが確認できます(これは target_schema パラメータによって決まります)。

  1. このデータをサンプリングすると、dbtが dbt_valid_from および dbt_valid_to 列を含めていることがわかります。後者は null に設定されています。次回の実行でこれが更新されます。

  2. 大好きな俳優 Clicky McClickHouse をさらに10本の映画に出演させます。

  3. imdb ディレクトリから dbt run コマンドを再実行します。これにより、インクリメンタルモデルが更新されます。これが完了したら、変更をキャプチャするために dbt snapshot を実行します。

  4. スナップショットをクエリすると、Clicky McClickHouseの2行目があることに注意してください。以前のエントリには dbt_valid_to 値があります。新しい値は dbt_valid_from 列に同じ値が記録され、dbt_valid_to 値はnullです。新しい行があれば、これらもスナップショットに追加されます。

dbtスナップショットの詳細については、こちらを参照してください。

Using Seeds

dbtはCSVファイルからデータをロードする機能を提供します。この機能は、データベースの大きなエクスポートをロードするのには適しておらず、コードテーブルや dictionaries に通常使用される小さなファイル向けに設計されています。例えば、国コードを国名にマッピングすることが挙げられます。ここでは、シード機能を使用してジャンルコードのリストを生成し、アップロードする簡単な例を示します。

  1. 既存のデータセットからジャンルコードのリストを生成します。dbtディレクトリから、clickhouse-clientを使用してファイルseeds/genre_codes.csvを作成します:

  2. dbt seedコマンドを実行します。これにより、CSVファイルの行を持つ新しいテーブル genre_codes がデータベース imdb_dbt に作成されます(スキーマ構成によって定義されます)。

  3. これらがロードされたことを確認します:

Limitations

現在のClickHouseプラグインは、ユーザーが認識しておくべきいくつかの制限があります:

  1. プラグインは現在、モデルをテーブルとして INSERT TO SELECTを使用してマテリアライズします。これは効果的にデータの重複を意味します。非常に大きなデータセット(PB)の場合、極端に長い実行時間が発生する可能性があり、一部のモデルが実行不可能になることがあります。可能な限りGROUP BYを利用して、任意のクエリから返される行数を最小限に抑えることを目指してください。ソースの行数を維持しつつ単にトランスフォームを行うモデルよりも、データを要約するモデルを優先します。
  2. モデルを表すために分散テーブルを使用するには、ユーザーは手動で各ノードに基底のレプリケーテッドテーブルを作成する必要があります。その上に分散テーブルを作成することができます。プラグインはクラスターの作成を管理しません。
  3. dbtがデータベースに関係(テーブル/ビュー)を作成すると、通常は次の形式で作成されます: {{ database }}.{{ schema }}.{{ table/view id }}。ClickHouseにはスキーマの概念がありません。したがって、プラグインは {{schema}}.{{ table/view id }} を使用します。ここで、schemaはClickHouseデータベースです。

Further Information

前のガイドは、dbt機能の表面を少し触れるだけです。ユーザーは優れた dbt documentation を読むことをお勧めします。

プラグインの追加設定については here に記載されています。

Fivetran

dbt-clickhouseコネクタは、 Fivetran transformations で使用することも可能で、dbtを使用してFivetranプラットフォーム内でシームレスな統合と変換機能を提供します。