メインコンテンツまでスキップ
メインコンテンツまでスキップ

データのバックフィル

ClickHouseに新しく触れているユーザーや、既存のデプロイメントを担当しているユーザーは、必然的に歴史的データでテーブルをバックフィルする必要があります。場合によっては、比較的シンプルですが、物理的なビューをポピュレートする必要がある場合は、より複雑になることがあります。このガイドでは、ユーザーが自分のユースケースに適用できるこのタスクのためのいくつかのプロセスをドキュメントしています。

注記

このガイドは、ユーザーが増分物理ビューや、s3やgcsなどのテーブル関数を使用したデータのロードの概念に既に慣れていることを前提としています。また、ユーザーがオブジェクトストレージからの挿入パフォーマンスの最適化に関するガイドを読むことをお勧めしており、そのアドバイスはこのガイド全体の挿入に適用できます。

例データセット

このガイドでは、PyPIデータセットを使用します。このデータセットの各行は、pipなどのツールを使用したPythonパッケージのダウンロードを表します。

例えば、サブセットは単一の日 - 2024-12-17をカバーしており、このデータはhttps://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/で公開されています。ユーザーは以下のようにクエリを実行できます:

このバケットのフルデータセットには、320 GBを超えるパーケットファイルが含まれています。以下の例では、意図的にグロブパターンを使用してサブセットをターゲットにします。

ユーザーは、例えばKafkaやオブジェクトストレージからこのデータのストリームを消費していると仮定します。この日以降のデータに対して。データのスキーマは以下に示されています:

注記

フルPyPIデータセットには、1兆行を超えるデータが含まれており、我々のパブリックデモ環境clickpy.clickhouse.comで入手可能です。このデータセットの詳細や、デモで物理ビューを利用してパフォーマンスを向上させる方法、データが毎日ポピュレートされる方法については、こちらをご覧ください。

バックフィリングシナリオ

バックフィリングは、特定の時点からデータストリームが消費されるときに一般的に必要です。このデータは、増分物理ビューでClickHouseテーブルに挿入され、挿入されたブロックにトリガされます。これらのビューは、挿入の前にデータを変換したり、集計を計算してターゲットテーブルに結果を送信したりします。

我々は以下のシナリオをカバーすることを試みます:

  1. 既存のデータ取り込みによるバックフィリング - 新しいデータがロードされており、歴史的データがバックフィルされる必要があります。この歴史的データは特定されています。
  2. 既存のテーブルに物理ビジュアルを追加 - 歴史的データがポピュレートされ、データが既にストリーミングされている設定に新しい物理ビューを追加する必要があります。

データはオブジェクトストレージからバックフィルされると仮定します。すべての場合で、データの挿入を中断しないようにすることを目指しています。

オブジェクトストレージから歴史的データをバックフィルすることをお勧めします。データは可能な限りパーケットにエクスポートされ、最適な読み取り性能と圧縮(ネットワーク転送の削減)のために。通常、約150MBのファイルサイズが好まれますが、ClickHouseは70以上のファイルフォーマットをサポートしており、すべてのサイズのファイルを処理できます。

重複テーブルとビューの使用

すべてのシナリオにおいて、我々は「重複テーブルとビュー」の概念に依存しています。これらのテーブルとビューは、ライブストリーミングデータに使用されるもののコピーを表し、バックフィルを孤立して実行できるようにし、失敗が発生した場合に復旧のための簡単な手段を提供します。例えば、以下のようなメインのpypi テーブルと、Pythonプロジェクトごとのダウンロード数を計算する物理ビューがあります:

メインテーブルと関連するビューをデータのサブセットを使用してポピュレートします:

他のサブセット {101..200} をロードしたいと仮定します。pypi に直接挿入できるかもしれませんが、重複テーブルを作成することでこのバックフィルを孤立して実行できます。

バックフィルが失敗した場合、メインテーブルには影響を与えず、単純にtruncateして重複テーブルを再実行できます。

これらのビューの新しいコピーを作成するには、CREATE TABLE AS句を使ってサフィックス_v2を用います:

おおよそ同じサイズの2番目のサブセットでこれをポピュレートし、成功裏にロードされたことを確認します。

2度目のロード中に失敗が発生した場合、単純にtruncateしてpypi_v2pypi_downloads_v2を再度ロードすることができます。

データのロードが完了したら、ALTER TABLE MOVE PARTITION句を使用して、重複テーブルからメインテーブルにデータを移動できます。

パーティション名

上記のMOVE PARTITION呼び出しは、パーティション名()を使用しています。これは、このテーブルの単一パーティションを表します(パーティションはありません)。パーティションされたテーブルの場合、ユーザーは各パーティションごとに複数のMOVE PARTITION呼び出しを行う必要があります。現在のパーティション名は、system.partsテーブルから調べることができます。例:SELECT DISTINCT partition FROM system.parts WHERE (table = 'pypi_v2').

これでpypipypi_downloadsが完全なデータを含んでいることを確認できます。pypi_downloads_v2pypi_v2は安全に削除できます。

重要なのは、MOVE PARTITION操作は軽量で(ハードリンクを利用)、原子的であり、すなわち中間状態なしに失敗するか成功するかのいずれかです。

このプロセスは、以下のバックフィリングシナリオで広く利用されます。

このプロセスでは、ユーザーが各挿入操作のサイズを選択する必要があることに注意してください。

より大きな挿入、すなわちより多くの行は、必要なMOVE PARTITION操作を減らすことを意味します。しかし、これはネットワークの中断による挿入失敗時のコストとバランスを取る必要があります。ユーザーは、リスクを低減するためにファイルをバッチ処理することを補完できます。これは、リストされる範囲のクエリ(例:WHERE timestamp BETWEEN 2024-12-17 09:00:00 AND 2024-12-17 10:00:00)やグロブパターンを使用して行うことができます。例えば、

注記

ClickPipesは、オブジェクトストレージからデータをロードする際にこのアプローチを使用し、ターゲットテーブルとその物理ビューの重複を自動的に作成し、ユーザーに上記のステップを実行する必要を避けます。異なるサブセットを処理する複数のワーカースレッドを使用することで、データを迅速にロードし、正確に一度だけのセマンティクスを実現します。興味のある方は、このブログで詳細をご覧ください。

シナリオ 1: 既存のデータ取り込みによるバックフィリング

このシナリオでは、バックフィルするデータが孤立したバケットに存在せず、フィルタリングが必要であると仮定します。データは既に挿入されており、タイムスタンプや単調増加列を特定でき、そこから歴史的データをバックフィルする必要があります。

このプロセスは以下のステップに従います:

  1. チェックポイントを特定する - タイムスタンプまたは歴史的データを復元する必要がある列の値。
  2. メインテーブルと物理ビューのターゲットテーブルの重複を作成する。
  3. ステップ(2)で作成したターゲットテーブルを指す物理ビューのコピーを作成する。
  4. ステップ(2)で作成した重複メインテーブルに挿入する。
  5. 重複テーブルから元のバージョンにすべてのパーティションを移動し、重複テーブルを削除する。

例えば、PyPIデータで必要なデータがロードされていると仮定します。最小タイムスタンプを特定できるため、我々の「チェックポイント」がわかります。

上記から、2024-12-17 09:00:00より前のデータをロードする必要があることがわかります。先ほどのプロセスを用いて、重複テーブルとビューを作成し、タイムスタンプにフィルタをかけたサブセットをロードします。

注記

パーケットのタイムスタンプ列をフィルタリングすることは非常に効率的です。ClickHouseは、ロードするフルデータ範囲を特定するためにタイムスタンプ列だけを読み取ります。パーケットインデックス(例えばmin-max)もClickHouseクエリエンジンによって利用できます。

この挿入が完了したら、関連するパーティションを移動できます。

もし歴史的データが孤立したバケットであれば、上記の時間フィルタは必要ありません。時間または単調増加列が利用できない場合は、歴史的データを分離します。

ClickHouse CloudでClickPipesを使うだけ

ClickHouse Cloudのユーザーは、データが自分のバケットに孤立させられる場合、歴史的バックアップを復元するためにClickPipesを使用するべきです(この場合フィルタは必要ありません)。複数のワーカーを用いたロードを並列化し、これによってロード時間を短縮し、ClickPipesは上記のプロセスを自動化します - メインテーブルと物理ビューの両方の重複テーブルを作成します。

シナリオ 2: 既存のテーブルに物理ビューを追加

新しい物理ビューを追加する必要がある設定には、かなりのデータがポピュレートされ、データが挿入されることは珍しくありません。時刻または単調増加列が利用できると、ストリーム内のポイントを特定するのに役立ち、データ取り込みの中断を避けることができます。以下の例では、両方のケースが想定されており、データ取り込みを中断を避けるアプローチを優先します。

POPULATEを避ける

小さなデータセットで取り込みが一時停止されている場合を除いて、物理ビューのバックフィルにPOPULATEコマンドの使用は推奨されません。このオペレーターは、ポピュレートハッシュが完了した後にソーステーブルに挿入された行を見逃す可能性があります。さらに、このポピュレートはすべてのデータに対して実行され、大規模なデータセットでの中断やメモリの制限に対して脆弱です。

タイムスタンプまたは単調増加列が利用できる場合

この場合、我々は新しい物理ビューに、未来の任意のデータよりも大きい行のみに制限するフィルタを含めることをお勧めします。この物理ビューは、その後、メインテーブルの歴史的データを使用してこの日からバックフィルされることになります。バックフィルのアプローチは、データサイズと関連クエリの複雑さに依存します。

最も単純なアプローチは、次のステップを含みます:

  1. 任意の時間の近い未来よりも大きい行のみを考慮するフィルタを用いて物理ビューを作成します。
  2. INSERT INTO SELECTクエリを実行し、物理ビューのターゲットテーブルに挿入し、集約クエリでソーステーブルから読み取ります。

これは追加のサブデータにターゲットを定めるためにステップ(2)で強化することができ、または失敗後の復旧を容易にするために物理ビューのための重複したターゲットテーブルを使用することができます(挿入が完了した後に元のテーブルにパーティションをアタッチ)。

以下は、毎時最も人気のあるプロジェクトを計算する物理ビューです。

ターゲットテーブルを追加できますが、物理ビューを追加する前に、そのSELECT節を変更して、任意の近い未来の時間よりも大きい行のみを考慮するようにします。この場合、2024-12-17 09:00:00を近くの時間と仮定します。

このビューが追加されたら、上述の日付より前のこのビューのすべてのデータをバックフィルすることができます。

最も簡単な方法は、フィルタを追加したメインテーブルから物理ビューのクエリを実行し、INSERT INTO SELECTを介してビューのターゲットテーブルに結果を挿入することです。例えば、上記のビューにおいて:

注記

上記の例では、ターゲットテーブルはSummingMergeTreeです。この場合、元の集約クエリを単純に使用できます。より複雑なユースケースでは、AggregatingMergeTreeを利用し、集計には-State関数を使用します。これについての例はこちらで見ることができます。

我々の場合、これは比較的軽量な集約で、3秒以内で完了し、600MiB未満のメモリを使用します。より複雑または長時間実行される集約の場合、ユーザーは、このプロセスをより堅牢にするために従来の重複テーブルアプローチを使用し、シャドウターゲットテーブル(例:pypi_downloads_per_day_v2)を作成し、このテーブルに挿入し、その結果のパーティションをpypi_downloads_per_dayにアタッチすることができます。

物理ビューのクエリは、より複雑であることが多く(さもなければユーザーはビューを使用しないでしょう!)、リソースを消費することがあります。まれなケースでは、クエリのリソースがサーバーの限界を超えることもあります。これがClickHouseの物理ビューの利点の一つを示しています。それは、インクリメンタルであり、全データセットを一度に処理しないということです!

この場合、ユーザーは以下の選択肢があります:

  1. クエリを変更してレンジをバックフィルします。例:WHERE timestamp BETWEEN 2024-12-17 08:00:00 AND 2024-12-17 09:00:00WHERE timestamp BETWEEN 2024-12-17 07:00:00 AND 2024-12-17 08:00:00など。
  2. Nullテーブルエンジンを使用して物理ビューを埋めます。これは、物理ビューの通常のインクリメンタルな生成を再現し、データブロック(設定可能なサイズ)を繰り返しクエリ実行します。

(1)は、最も簡単なアプローチであり、しばしば十分です。簡潔さのために例を含めません。

以下で(2)をさらに探ります。

Nullテーブルエンジンを使用して物理ビューを埋める

Nullテーブルエンジンは、データを永続化しないストレージエンジンを提供します(テーブルエンジンの世界での/dev/nullだと思ってください)。これは矛盾しているように思えますが、物理ビューはこのテーブルエンジンに挿入されたデータに対しても実行されます。これにより、元のデータを永続化せずに物理ビューを構築でき、I/Oや関連するストレージを回避できます。

重要なのは、テーブルエンジンに接続された物理ビューは、挿入時にデータのブロックに対しても実行され、それらの結果をターゲットテーブルに送信します。これらのブロックは設定可能なサイズです。より大きなブロックは、より効率的(そして迅速に処理される)ですが、リソース(主にメモリ)をより消費します。このテーブルエンジンの使用により、物理ビューをインクリメンタルに構築、すなわち1ブロックずつ処理できます。全集計をメモリ内に保持する必要がありません。


以下の例を考えてみましょう:

ここでは、物理ビューを構築するために、行を受け取るためのNullテーブルpypi_v2を作成します。必要なカラムだけをスキーマに制限していることに注意してください。我々の物理ビューは、このテーブルに挿入された行に対して集約を実行し(1ブロックずつ)、結果をターゲットテーブルpypi_downloads_per_dayに送信します。

注記

ここでターゲットテーブルとしてpypi_downloads_per_dayを使用しました。追加の堅牢性のために、ユーザーは重複テーブルpypi_downloads_per_day_v2を作成し、物理ビューのターゲットテーブルとしてこのテーブルを使用することができます。挿入が完了した後に、pypi_downloads_per_day_v2のパーティションをpypi_downloads_per_dayに移動できます。これにより、挿入がメモリの問題やサーバーの中断によって失敗した場合の復旧が可能になります。つまり、pypi_downloads_per_day_v2をトランケートし、設定を調整して再試行すればいいのです。

この物理ビューを埋めるために、次のようにpypiからpypi_v2にバックフィルする関連データを挿入します。

ここでのメモリ使用量は639.47 MiBです。

パフォーマンスとリソースの調整

上記のシナリオでのパフォーマンスとリソースの使用は、いくつかの要因によって決まります。調整を試みる前に、読者には読むためのスレッドの使用セクションで詳細にドキュメントされた挿入メカニクスを理解することをお勧めします。まとめると:

  • 読み取りの並列性 - 読み取るために使用されるスレッドの数。max_threadsを通じて制御されます。ClickHouse Cloudでは、これはインスタンスサイズによって決定され、デフォルトでvCPUの数になります。この値を増やすことで、メモリ使用量は増加しますが、読み取りパフォーマンスが向上する可能性があります。
  • 挿入の並列性 - 挿入するために使用される挿入スレッドの数。これはmax_insert_threadsを通じて制御されます。ClickHouse Cloudでは、これはインスタンスサイズ(2〜4の間)によって決定され、OSSでは1に設定されます。この値を増やすことで、メモリ使用量は増加しますが、パフォーマンスが向上する可能性があります。
  • 挿入ブロックサイズ - データはループで処理され、データが取得され、解析され、メモリ内の挿入ブロックに作成されます。これらのブロックは、パーティショニングキーに基づいています。これらのブロックはソートされ、最適化され、圧縮され、新しいdata partsとしてストレージに書き込まれます。挿入ブロックのサイズは、min_insert_block_size_rowsmin_insert_block_size_bytes(非圧縮)によって制御され、メモリ使用量とディスクI/Oに影響を与えます。大きなブロックはメモリをより多く使用しますが、部品を減らし、I/Oやバックグラウンドのマージを削減します。これらの設定は最小スレッショルドを表し(どちらかが最初に到達するとフラッシュがトリガされます)。
  • 物理ビューのブロックサイズ - メイン挿入の上記のメカニクスに加えて、物理ビューに挿入される前に、ブロックもより効率的に処理されるように圧縮されます。これらのブロックのサイズは、min_insert_block_size_bytes_for_materialized_viewsmin_insert_block_size_rows_for_materialized_viewsによって決定されます。大きなブロックは、より大きなメモリ使用量の犠牲に、効率的な処理を可能にします。デフォルトでは、これらの設定はソーステーブル設定min_insert_block_size_rowsおよびmin_insert_block_size_bytesの値に戻ります。

パフォーマンスを向上させるために、ユーザーは挿入のためのスレッドとブロックサイズの調整セクションで示されたガイドラインに従うことができます。ほとんどの場合、パフォーマンスを改善するためにmin_insert_block_size_bytes_for_materialized_viewsmin_insert_block_size_rows_for_materialized_viewsを変更する必要はありません。これらを変更する場合は、min_insert_block_size_rowsmin_insert_block_size_bytesと同様のベストプラクティスを使用してください。

メモリを最小限に抑えるために、ユーザーはこれらの設定で実験することを望むかもしれません。これにより、間違いなくパフォーマンスが低下します。先ほどのクエリを使用して、以下の例を示します。

max_insert_threadsを1に下げることで、メモリオーバーヘッドを削減します。

さらに、max_threads設定を1に下げることでメモリをさらに減らすことができます。

最後に、min_insert_block_size_rowsを0に設定してブロックサイズの判断要因として無効にし、min_insert_block_size_bytesを10485760(10MiB)に設定することで、メモリをさらに減らすことができます。

最後に、ブロックサイズを低くすると部品が増え、マージ圧力が増加することに注意してください。これらの設定は慎重に変更する必要がありますこちらで議論されています。

次の最後から2番目のステップでは、前年に説明されたシンプルな INSERT INTO SELECT アプローチを使用して pypi_downloads_per_day にバックフィルを行います。これは、前述のNullテーブルアプローチを使用して強化することもでき、より強靭性のために複製テーブルをオプションで使用できます。

この操作には挿入を一時停止する必要がありますが、中間操作は通常迅速に完了でき、データの中断を最小限に抑えます。