並列レプリカ
Introduction
ClickHouseはクエリを非常に迅速に処理しますが、これらのクエリはどのように複数のサーバーに分散および並列処理されるのでしょうか?
このガイドでは、まずClickHouseがどのように分散テーブルを介してクエリを複数のシャードに分配するかを説明し、その後、クエリがどのように複数のレプリカを活用して実行されるかについて説明します。
Sharded architecture
共有無用アーキテクチャでは、クラスターは通常、複数のシャードに分割され、各シャードは全体のデータのサブセットを含んでいます。分散テーブルはこれらのシャードの上に位置し、完全なデータの統一ビューを提供します。
読み込みはローカルテーブルに送信できます。クエリの実行は指定されたシャードのみで発生するか、分散テーブルに送信され、その場合、各シャードが指定されたクエリを実行します。分散テーブルがクエリされたサーバーはデータを集約し、クライアントに応答します:

上の図は、クライアントが分散テーブルをクエリしたときに何が起こるかを可視化しています:
SELECTクエリは、ノード上の分散テーブルにランダムに送信されます (ラウンドロビン戦略を介して、または負荷分散装置によって特定のサーバーにルーティングされた後)。このノードは、コーディネーターとして機能します。
ノードは、分散テーブルによって指定された情報を介してクエリを実行する必要がある各シャードを特定し、クエリを各シャードに送信します。
各シャードはローカルでデータを読み取り、フィルタリングし、集約し、マージ可能な状態をコーディネーターに送信します。
コーディネーターはデータをマージし、クライアントに応答を返します。
レプリカを組み込むと、プロセスは非常に似ています。唯一の違いは、各シャードからの単一のレプリカのみがクエリを実行することです。これにより、より多くのクエリを並行して処理できるようになります。
Non-sharded architecture
ClickHouse Cloudは、上記のアーキテクチャとは非常に異なるアーキテクチャを持っています。 (詳しくは"ClickHouse Cloud Architecture"を参照してください)。計算とストレージが分離され、事実上無限のストレージがあるため、シャードの必要性はそれほど重要ではなくなります。
以下の図はClickHouse Cloudのアーキテクチャを示しています:

このアーキテクチャでは、レプリカをほぼ瞬時に追加および削除できるため、非常に高いクラスターのスケーラビリティを確保できます。ClickHouse Keeperクラスター(右側に示す)は、メタデータの単一の真実のソースを持つことを保証します。レプリカはClickHouse Keeperクラスターからメタデータを取得し、すべては同じデータを維持します。データ自体はオブジェクトストレージに格納されており、SSDキャッシュによりクエリを高速化することができます。
では、クエリの実行をどのように複数のサーバーに分散できるのでしょうか?シャードアーキテクチャでは、各シャードが実際にデータのサブセットに対してクエリを実行できるため、それはかなり明らかでした。シャーディングがない場合、どうすればよいのでしょうか?
Introducing parallel replicas
複数のサーバーでのクエリ実行を並列化するには、まずサーバーの1つをコーディネーターとして割り当てる必要があります。コーディネーターは実行する必要のあるタスクのリストを作成し、それらがすべて実行され、集約され、結果がクライアントに返されることを保証します。ほとんどの分散システムと同様に、初期クエリを受信するノードがこの役割を担います。また、作業の単位を定義する必要があります。シャードアーキテクチャでは、作業の単位はシャードであり、データのサブセットです。並列レプリカを使用すると、グラニュールと呼ばれるテーブルの小さな部分を作業の単位として使用します。
では、下の図を使って、実際にどのように機能するか見てみましょう:

並列レプリカを使用すると:
クライアントからのクエリは負荷分散装置を通過した後、1つのノードに送信されます。このノードはこのクエリのコーディネーターになります。
ノードは各パーツのインデックスを分析し、処理するための正しいパーツとグラニュールを選択します。
コーディネーターは作業負荷を、異なるレプリカに割り当てることができるグラニュールのセットに分割します。
各グラニュールのセットは対応するレプリカによって処理され、終了するとマージ可能な状態がコーディネーターに送信されます。
最後に、コーディネーターはすべてのレプリカから結果をマージし、クライアントに応答を返します。
上記のステップは、理論上の並列レプリカの動作を概説しています。しかし、実際には、その論理が完璧に機能するのを妨げる多くの要因があります:
一部のレプリカは利用できない場合があります。
ClickHouseのレプリケーションは非同期であるため、一部のレプリカは時間的に異なるパーツを持つ可能性があります。
レプリカ間の遅延ラテシーを何らかの方法で処理する必要があります。
ファイルシステムキャッシュはレプリカごとにアクティビティに応じて異なるため、ランダムなタスク割り当てはキャッシュの局所性に基づいて最適性能を妨げる可能性があります。
これらの要因に対処する方法を次のセクションで探ります。
Announcements
上記のリストの(1)および(2)に対処するために、アナウンスメントの概念を導入しました。以下の図を使って、どのように機能するかを可視化しましょう:

クライアントからのクエリは負荷分散装置を通過した後、1つのノードに送信されます。このノードはこのクエリのコーディネーターになります。
コーディネーターのノードは、クラスター内のすべてのレプリカからアナウンスメントを取得するリクエストを送信します。レプリカは、テーブルの現在のパーツセットに対してわずかに異なるビューを持つ可能性があります。そのため、不正確なスケジューリング決定を避けるために、この情報を収集する必要があります。
コーディネーターのノードは、アナウンスメントを使用して、さまざまなレプリカに割り当てることができるグラニュールのセットを定義します。ここでは、パート3のグラニュールはレプリカ2に割り当てられていないことがわかります。なぜなら、このレプリカはそのアナウンスメントにこのパートを提供しなかったからです。また、レプリカ3にはアナウンスメントが提供されなかったため、タスクは割り当てられていません。
各レプリカは、自身のグラニュールサブセットでクエリを処理し、マージ可能な状態をコーディネーターに送信した後、コーディネーターは結果をマージし、応答がクライアントに送信されます。
Dynamic coordination
尾の遅延の問題に対処するために、動的コーディネーションを追加しました。これは、すべてのグラニュールを1回のリクエストでレプリカに送信するのではなく、各レプリカがコーディネーターに新しいタスク(処理するグラニュールのセット)をリクエストできることを意味します。コーディネーターは、受け取ったアナウンスメントに基づいてレプリカにグラニュールのセットを割り当てます。
すべてのレプリカがすべてのパーツを含むアナウンスメントを送信した段階であると仮定しましょう。
以下の図は、動的コーディネーションがどのように機能するかを可視化しています:

レプリカはコーディネーターノードにタスクを処理できることを知らせ、処理できる作業量を指定することもできます。
コーディネーターはレプリカにタスクを割り当てます。

レプリカ1と2はタスクを非常に速く終了できます。それらはコーディネーターノードに別のタスクをリクエストします。
コーディネーターはレプリカ1と2に新しいタスクを割り当てます。

すべてのレプリカはタスクの処理を終了しました。彼らはさらにタスクを要求します。
コーディネーターはアナウンスメントを使用して、処理される残りのタスクをチェックしますが、残っているタスクはありません。
コーディネーターは、すべてが処理されたことをレプリカに通知します。現在、すべてのマージ可能な状態をマージし、クエリに応答します。
Managing cache locality
最後に残された潜在的な問題は、キャッシュの局所性をどのように処理するかです。クエリが複数回実行される場合、同じタスクが同じレプリカにルーティングされることをどのように保証できますか?前の例では、次のタスクが割り当てられました:
レプリカ1 | レプリカ2 | レプリカ3 | |
---|---|---|---|
パート1 | g1, g6, g7 | g2, g4, g5 | g3 |
パート2 | g1 | g2, g4, g5 | g3 |
パート3 | g1, g6 | g2, g4, g5 | g3 |
同じタスクが同じレプリカに割り当てられてキャッシュの恩恵を受けることを保証するために、2つのことが起こります。パート+グラニュールのセット(タスク)のハッシュが計算されます。タスク割り当てにはレプリカの数でモジュロが適用されます。
紙の上ではこれはうまく見えるかもしれませんが、実際には、一つのレプリカへの突然の負荷やネットワークの劣化が、特定のタスクを実行するために同じレプリカが一貫して使用される場合に遅延ラテシーを導入する可能性があります。max_parallel_replicas
がレプリカの数より少ない場合、クエリ実行のためにランダムなレプリカが選択されます。
Task stealing
あるレプリカが他よりもタスクを遅く処理している場合、他のレプリカはそのレプリカに本来属するべきタスクをハッシュで「盗もう」として、尾の遅延を減少させます。
Limitations
この機能には知られた制限がありますが、その主なものはこのセクションに文書化されています。
以下に示す制限のいずれでもない問題を見つけ、並列レプリカが原因であると疑う場合は、comp-parallel-replicas
ラベルを使用してGitHubに問題を開いてください。
制限 | 説明 |
---|---|
複雑なクエリ | 現在、並列レプリカは単純なクエリに対してかなり良好に機能します。CTE、サブクエリ、JOIN、非平坦クエリなどの複雑さの層は、クエリパフォーマンスに悪影響を与える可能性があります。 |
小さなクエリ | 多くの行を処理しないクエリを実行している場合、複数のレプリカで実行してもパフォーマンスが向上しない可能性があります。これは、レプリカ間の調整のためのネットワーク時間が追加のサイクルをクエリ実行にもたらす可能性があるからです。これらの問題を制限するには、設定を使用します: parallel_replicas_min_number_of_rows_per_replica 。 |
FINALでの並列レプリカの無効化 | |
プロジェクションと並列レプリカの同時使用は不可 | |
高いカーディナリティデータと複雑な集約 | 多くのデータを送信する必要がある高カーディナリティ集約は、クエリを著しく遅くする可能性があります。 |
新しいアナライザーとの互換性 | 新しいアナライザーは、特定のシナリオでクエリ実行を著しく遅くしたり、逆に高速化したりする可能性があります。 |
Settings related to parallel replicas
設定 | 説明 |
---|---|
enable_parallel_replicas | 0 : 無効1 : 有効 2 : 並列レプリカの使用を強制し、使用されなければ例外をスローします。 |
cluster_for_parallel_replicas | 並列レプリケーションに使用するクラスタ名; ClickHouse Cloudを使用している場合は、default を使用してください。 |
max_parallel_replicas | 複数のレプリカでのクエリ実行に使用する最大レプリカ数。一部のレプリカの数より少ない数が指定されると、ノードはランダムに選択されます。この値も水平方向のスケーリングに対応できるように過剰な値を書くことができます。 |
parallel_replicas_min_number_of_rows_per_replica | 処理する必要のある行数に基づいて使用されるレプリカの数を制限するのに役立ちます。使用されるレプリカの数は次のように定義されます:estimated rows to read / min_number_of_rows_per_replica 。 |
allow_experimental_analyzer | 0 : 古いアナライザーを使用1 : 新しいアナライザーを使用します。並列レプリカの動作は使用されるアナライザーに基づいて変わる可能性があります。 |
Investigating issues with parallel replicas
system.query_log
テーブルで、各クエリに使用されている設定を確認できます。また、system.events
テーブルを確認してサーバーで発生したすべてのイベントを確認でき、clusterAllReplicas
テーブル関数を使用してすべてのレプリカ上のテーブルを確認できます
(クラウドユーザーの場合はdefault
を使用します)。
Response
system.text_log
テーブルには、並列レプリカを使用したクエリの実行に関する情報も含まれています:
Response
最後に、EXPLAIN PIPELINE
も使用できます。これにより、ClickHouseがどのようにクエリを実行し、クエリの実行にどのリソースが必要かを強調表示します。以下のクエリを例としてみましょう:
並列レプリカなしのクエリパイプラインを見てみましょう:

そして今、並列レプリカありでは:
