メインコンテンツまでスキップ
メインコンテンツまでスキップ

ClickHouseがクエリを並列で実行する方法

ClickHouseはスピードのために設計されています。それは、すべての利用可能なCPUコアを使用し、処理レーンにデータを分配し、しばしばハードウェアを限界までプッシュする非常に並列な方法でクエリを実行します。

このガイドでは、ClickHouseにおけるクエリの並列処理がどのように機能するか、また、大規模なワークロードのパフォーマンスを改善するためにそれをチューニングまたは監視する方法を説明します。

私たちは、uk_price_paid_simpleデータセットに対する集約クエリを使用して、主要な概念を説明します。

ステップバイステップ: ClickHouseが集約クエリを並列化する方法

ClickHouseが①テーブルの主キーにフィルターをかけた集約クエリを実行すると、②主インデックスをメモリに読み込み、③どのグラニュールを処理する必要があるか、またどのグラニュールを安全にスキップできるかを特定します:

インデックス分析

処理レーンに対する作業の分配

選択されたデータは、n 並列 処理レーン動的に分配され、データをブロック単位でストリーミングしながら最終結果に処理します:

4つの並列処理レーン


このn 並列処理レーンの数は、max_threads設定によって制御され、デフォルトではClickHouseがサーバー上で使用可能なCPUコアの数と同じになります。上記の例では、4 コアを仮定しています。

8 コアを持つマシンであれば、クエリ処理のスループットは大体2倍になります(ただし、メモリの使用量もそれに応じて増加します)。より多くのレーンが並列にデータを処理するためです:

8つの並列処理レーン


効率的なレーン分配は、CPUの利用率を最大化し、総クエリ時間を短縮するための鍵です。

シャーディングされたテーブルでのクエリ処理

テーブルデータが複数のサーバーにシャードとして分散されている場合、各サーバーはそのシャードを並列に処理します。各サーバー内では、上記と同様に地元のデータが並列処理レーンを使用して処理されます:

分散レーン


クエリを最初に受け取るサーバーは、シャードからすべてのサブ結果を収集し、最終的なグローバル結果に結合します。

シャード間でのクエリ負荷の分散は、特に高スループット環境において、並列処理の水平スケーリングを可能にします。

ClickHouse Cloudはシャードの代わりに並列レプリカを使用します

ClickHouse Cloudでは、シャードと同様に機能する並列レプリカを通じてこの同じ並列性が実現されます。各ClickHouse Cloudレプリカは、ステートレスの計算ノードであり、データの一部を並列に処理し、独立したシャードのように最終結果に貢献します。

クエリ並列性の監視

これらのツールを使用して、クエリが利用可能なCPUリソースを完全に活用していることを確認し、活用していない場合は診断します。

私たちは、59のCPUコアを持つテストサーバーでこれを実行しています。これにより、ClickHouseはそのクエリ並列性を完全に示すことができます。

例のクエリがどのように実行されるかを観察するために、ClickHouseサーバーに集約クエリ中のすべてのトレースレベルのログエントリを返すよう指示できます。このデモでは、クエリの述語を削除しました—そうしないと、3つのグラニュールしか処理されず、それ以上の並列処理レーンをClickHouseが充分に利用することができません:

SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
SETTINGS send_logs_level='trace';
① <Debug> ...: 3609 marks to read from 3 ranges
② <Trace> ...: Spreading mark ranges among streams
② <Debug> ...: Reading approx. 29564928 rows with 59 streams

以下のことが確認できます。

  • ① ClickHouseは、トレースログにマークとして示された3,609のグラニュールを3つのデータ範囲にわたって読み取る必要があります。
  • ② 59のCPUコアを使用することで、この作業は59の並列処理ストリームに分配されます—レーンごとに1つです。

あるいは、EXPLAIN句を使用して、集約クエリの物理オペレータープラン(クエリパイプラインとも呼ばれます)を検査できます:

EXPLAIN PIPELINE
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple;
    ┌─explain───────────────────────────────────────────────────────────────────────────┐
 1. │ (Expression)                                                                      │
 2. │ ExpressionTransform × 59                                                          │
 3. │   (Aggregating)                                                                   │
 4. │   Resize 59 → 59                                                                  │
 5. │     AggregatingTransform × 59                                                     │
 6. │       StrictResize 59 → 59                                                        │
 7. │         (Expression)                                                              │
 8. │         ExpressionTransform × 59                                                  │
 9. │           (ReadFromMergeTree)                                                     │
10. │           MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59 0 → 1 │
    └───────────────────────────────────────────────────────────────────────────────────┘

注意: 上記のオペレータープランを下から上に読んでください。各行は、底からストレージからデータを読み取ることから始まり、上部の最終処理ステップで終了します。× 59とマークされたオペレーターは、59の並列処理レーンにわたって重ならないデータ領域で同時に実行されます。これは、max_threadsの値を反映しており、クエリの各ステージがCPUコアにわたってどのように並列化されているかを示しています。

ClickHouseの埋め込みWeb UI/playエンドポイントで利用可能)は、上記の物理プランをグラフィカルなビジュアライゼーションとしてレンダリングできます。この例では、視覚化をコンパクトに保つためにmax_threads4に設定し、4つの並列処理レーンのみを表示します:

クエリパイプライン

注意: ビジュアルを左から右に読んでください。各行は、データをブロック単位でストリーミングし、フィルタリング、集約、最終処理ステージなどの変換を適用する並列処理レーンを表しています。この例では、max_threads = 4設定に対応する4つの並列レーンを見ることができます。

処理レーン間の負荷分散

物理プランの上記のResizeオペレーターは、処理レーン間でデータブロックストリームを再配分し再分配して、均等に使用されるように保ちます。この再バランスは、データ範囲がクエリ述語に一致する行の数が異なる場合に特に重要です。そうでなければ、一部のレーンは過負荷になり、他はアイドルになる可能性があります。作業を再分配することによって、より速いレーンが遅いレーンを助け、全体のクエリ実行時間を最適化します。

なぜmax_threadsが常に尊重されないのか

上記のように、n 並列処理レーンの数は、デフォルトではClickHouseがサーバー上で使用可能なCPUコアの数と同じであるmax_threads設定によって制御されます:

SELECT getSetting('max_threads');
   ┌─getSetting('max_threads')─┐
1. │                        59 │
   └───────────────────────────┘

ただし、処理のために選択されたデータの量によって、max_threadsの値が無視されることがあります:

EXPLAIN PIPELINE
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON';
...   
(ReadFromMergeTree)
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 30

上記のオペレータープランの抜粋に示されているように、max_threads59に設定されているにもかかわらず、ClickHouseはデータをスキャンするためにわずか30の同時ストリームを使用しています。

では、クエリを実行してみましょう:

SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON';
   ┌─max(price)─┐
1. │  594300000 │ -- 594.30 million
   └────────────┘

1 row in set. Elapsed: 0.013 sec. Processed 2.31 million rows, 13.66 MB (173.12 million rows/s., 1.02 GB/s.)
Peak memory usage: 27.24 MiB.   

上記の出力で示されたように、クエリは231万行を処理し、13.66MBのデータを読み込みました。これは、インデックス分析フェーズ中に、ClickHouseが処理のために282のグラニュールを選択し、それぞれが8,192行を含み、合計で約231万行になったためです:

EXPLAIN indexes = 1
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON';
    ┌─explain───────────────────────────────────────────────┐
 1. │ Expression ((Project names + Projection))             │
 2. │   Aggregating                                         │
 3. │     Expression (Before GROUP BY)                      │
 4. │       Expression                                      │
 5. │         ReadFromMergeTree (uk.uk_price_paid_simple)   │
 6. │         Indexes:                                      │
 7. │           PrimaryKey                                  │
 8. │             Keys:                                     │
 9. │               town                                    │
10. │             Condition: (town in ['LONDON', 'LONDON']) │
11. │             Parts: 3/3                                │
12. │             Granules: 282/3609                        │
    └───────────────────────────────────────────────────────┘  

構成されたmax_threads値にかかわらず、ClickHouseは十分なデータがある場合にのみ追加の並列処理レーンを割り当てます。max_threadsの「max」は上限を指し、使用されるスレッドの保証された数ではありません。

「十分なデータ」とは、主に、各処理レーンが処理すべき最小行数(デフォルトは163,840行)と、最小バイト数(デフォルトは2,097,152バイト)を定義する2つの設定によって決まります:

共有ナシクラスタ向け:

共有ストレージを持つクラスタ向け(例: ClickHouse Cloud):

さらに、読み取りタスクサイズの硬い下限もあり、次の設定によって制御されます:

これらの設定を変更しないでください

運用環境でこれらの設定を変更することは推奨されません。ここに表示されているのは、max_threadsが常に実際の並列性レベルを決定しない理由を示すためだけです。

デモ目的で、これらの設定をオーバーライドして最大の同時実行性を強制する物理プランを検査してみましょう:

EXPLAIN PIPELINE
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON'
SETTINGS
  max_threads = 59,
  merge_tree_min_read_task_size = 0,
  merge_tree_min_rows_for_concurrent_read_for_remote_filesystem = 0, 
  merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem = 0;
...   
(ReadFromMergeTree)
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59

これでClickHouseはデータをスキャンするために59の同時ストリームを使用し、設定されたmax_threadsを完全に尊重しています。

これは、小さなデータセットに対するクエリの場合、ClickHouseが意図的に同時実行を制限することを示しています。設定のオーバーライドはテストのためだけに使用し、運用環境では使用しないでください。効率的な実行やリソースの競合を引き起こす可能性があるためです。

主なポイント

  • ClickHouseは、max_threadsに結びついた処理レーンを使用してクエリを並列化します。
  • 実際のレーンの数は、処理のために選択されたデータのサイズに依存します。
  • EXPLAIN PIPELINEとトレースログを使用してレーン使用状況を分析します。

詳細情報を見つける場所

ClickHouseがクエリを並列で実行する方法や、高スケーラビリティでの高パフォーマンスの実現方法についてさらに深く理解したい場合は、以下のリソースを調べてみてください:

  • クエリ処理レイヤー – VLDB 2024 論文 (Web版) - ClickHouseの内部実行モデルの詳細な分析、スケジューリング、パイプライン処理、およびオペレータ設計を含む。

  • 部分的集約状態の解説 - 部分的集約状態が処理レーン全体で効率的な並列実行を可能にする方法についての技術的な深掘り。

  • ClickHouseのクエリ処理のすべてのステップを詳細に説明するビデオチュートリアル: