並列レプリカ
はじめに
ClickHouseはクエリを非常に迅速に処理しますが、これらのクエリはどのように複数のサーバーに分散および並列化されるのでしょうか?
このガイドでは、まずClickHouseがどのように分散テーブルを介してクエリを複数のシャードに分配するか、次にクエリがその実行のために複数のレプリカをどのように活用できるかについて説明します。
シャーディングアーキテクチャ
共有何もないアーキテクチャでは、クラスタは一般的に複数のシャードに分割され、各シャードには全データのサブセットが含まれます。分散テーブルはこれらのシャードの上に存在し、完全なデータの統一ビューを提供します。
読み取りはローカルテーブルに送信できます。クエリの実行は指定されたシャードだけで行われるか、分散テーブルに送信され、その場合は各シャードが指定されたクエリを実行します。分散テーブルがクエリされたサーバーは、データを集計し、クライアントに応答します:

上の図は、クライアントが分散テーブルをクエリしたときに何が起こるかを示しています:
SELECTクエリは、ノード上の分散テーブルにランダムに送信されます (ラウンドロビン戦略を介して、またはロードバランサーによって特定のサーバーにルーティングされた後)。このノードは、今後コーディネーターとして機能します。
ノードは、分散テーブルによって指定された情報を介して、クエリを実行する必要がある各シャードを特定し、クエリを各シャードに送信します。
各シャードはデータをローカルで読み、フィルタリングし、集計し、その後、コーディネーターにマージ可能な状態を返します。
コーディネートノードはデータをマージし、クライアントに応答を送信します。
レプリカが混ざる場合、プロセスはほぼ同様で、唯一の違いは各シャードからの単一のレプリカのみがクエリを実行することです。これにより、より多くのクエリを並列に処理できるようになります。
非シャーディングアーキテクチャ
ClickHouse Cloudは、上記のアーキテクチャとは非常に異なるアーキテクチャを持っています。 (詳細については "ClickHouse Cloud Architecture" を参照してください)。計算とストレージの分離、および実質的に無限のストレージにより、シャードの必要性は重要性を減少させます。
以下の図はClickHouse Cloudのアーキテクチャを示しています:

このアーキテクチャでは、レプリカをほぼ瞬時に追加および削除でき、高いクラスターのスケーラビリティを確保します。ClickHouse Keeperクラスター(右に示されています)は、メタデータの単一の真実のソースを確保します。レプリカはClickHouse Keeperクラスターからメタデータを取得し、すべてが同じデータを維持します。データ自体はオブジェクトストレージに保存され、SSDキャッシュによりクエリが高速化されます。
ただし、クエリの実行を複数のサーバーに分散するには、どうすればよいのでしょうか? シャーディングアーキテクチャでは、各シャードがデータのサブセットに対してクエリを実行できるため、それは非常に明白でした。シャーディングがない場合、これはどのように機能するのでしょうか?
並列レプリカの導入
複数のサーバーを通じてクエリ実行を並列化するには、まずコーディネーターとして機能するサーバーを指定できる必要があります。コーディネーターは、実行される必要があるタスクのリストを作成し、それらがすべて実行され、集約され、結果がクライアントに返されることを保証します。ほとんどの分散システムと同様に、これは初期クエリを受け取ったノードの役割となります。また、作業の単位を定義する必要があります。シャーディングアーキテクチャでは、作業の単位はシャードであり、データのサブセットです。並列レプリカでは、グラニュールと呼ばれるテーブルの小さな部分を作業の単位として使用します。
次に、以下の図を使って、実践でどのように機能するかを見てみましょう:

並列レプリカを使用すると:
クライアントからのクエリは、ロードバランサーを通過した後、1つのノードに送信されます。このノードはこのクエリのコーディネーターになります。
ノードは各パートのインデックスを分析し、処理すべき適切なパーツとグラニュールを選択します。
コーディネーターは、異なるレプリカに割り当てることができるグラニュールのセットに作業負荷を分割します。
各グラニュールセットは対応するレプリカによって処理され、完了したときにマージ可能な状態がコーディネーターに送信されます。
最後に、コーディネーターはすべてのレプリカからの結果をマージし、クライアントに応答を返します。
上記のステップは、理論における並列レプリカの機能を概説しています。 しかし、実際には、そうしたロジックが完璧に機能することを妨げる多くの要因があります:
一部のレプリカが利用できない場合があります。
ClickHouseにおけるレプリケーションは非同期であり、一部のレプリカは、ある時点で同じパーツを持っていないかもしれません。
レプリカ間の遅延は何らかの方法で処理する必要があります。
ファイルシステムキャッシュは各レプリカのアクティビティに基づいて異なるため、ランダムなタスク割り当てがキャッシュの局所性の観点から最適なパフォーマンスを実現できない可能性があります。
これらの要因を克服する方法については、以下のセクションで探ります。
アナウンスメント
上記のリストの(1)および(2)の問題に対処するために、アナウンスメントの概念を導入しました。以下の図を使って、これがどのように機能するかを視覚化してみましょう:

クライアントからのクエリは、ロードバランサーを通過した後、1つのノードに送信されます。このノードがこのクエリのコーディネーターになります。
コーディネートノードは、クラスター内のすべてのレプリカからアナウンスメントを取得するリクエストを送信します。レプリカは、テーブルの現在のパーツのセットに対してやや異なるビューを持つ可能性があります。そのため、正しくスケジュールされた決定を避けるためにこの情報を収集する必要があります。
コーディネートノードはアナウンスメントを使用して、異なるレプリカに割り当てることができるグラニュールのセットを定義します。例えば、ここでは、パート3のグラニュールがレプリカ2に割り当てられなかったことが確認できます。なぜなら、このレプリカがそのアナウンスメントにこのパートを提供しなかったからです。また、レプリカ3にタスクが割り当てられなかったことにも注意してください。なぜなら、このレプリカがアナウンスメントを提供しなかったからです。
各レプリカが自分のグラニュールのサブセットに対してクエリを処理し、マージ可能な状態をコーディネーターに送信した後、コーディネーターは結果をマージし、応答をクライアントに送信します。
動的コーディネーション
遅延の問題に対処するために、動的コーディネーションを追加しました。これは、すべてのグラニュールが一度のリクエストでレプリカに送信されるのではなく、各レプリカがコーディネーターに新しいタスク(処理すべきグラニュールのセット)を要求できることを意味します。コーディネーターは、受信したアナウンスメントに基づいてレプリカにグラニュールセットを提供します。
すべてのレプリカがすべてのパーツでアナウンスメントを送信した段階にいると仮定しましょう。
以下の図は、動的コーディネーションがどのように機能するかを視覚化しています:

レプリカは、コーディネーターノードにタスクを処理できることを知らせ、処理できる作業量を指定することもできます。
コーディネーターはレプリカにタスクを割り当てます。

レプリカ1と2は非常に迅速にタスクを完了します。レプリカは、コーディネーターからさらに別のタスクを要求します。
コーディネーターは、レプリカ1と2に新しいタスクを割り当てます。

すべてのレプリカはタスクの処理を完了しました。タスクをさらに要求します。
コーディネーターはアナウンスメントを使用して、処理する残りのタスクを確認しますが、残りのタスクはありません。
コーディネーターはレプリカにすべてが処理されたことを伝えます。これからマージ可能な状態をすべてマージし、クエリに応答します。
キャッシュの局所性の管理
最後の潜在的な問題は、キャッシュの局所性をどのように扱うかです。もしクエリが複数回実行される場合、どのようにして同じタスクを同じレプリカにルーティングするかを確保できるのでしょうか?前の例では、以下のタスクが割り当てられました:
レプリカ 1 | レプリカ 2 | レプリカ 3 | |
---|---|---|---|
パート 1 | g1, g6, g7 | g2, g4, g5 | g3 |
パート 2 | g1 | g2, g4, g5 | g3 |
パート 3 | g1, g6 | g2, g4, g5 | g3 |
同じタスクが同じレプリカに割り当てられるようにするために、2つのことが行われます。パート + グラニュールのセット(タスク)のハッシュが計算されます。そして、タスク割り当てに対してレプリカ数の剰余が適用されます。
これは理論上は良いことに思えますが、実際には、一つのレプリカに突発的な負荷がかかるか、ネットワークの劣化が発生した場合、特定のタスクを実行するために一貫して使用される同じレプリカによって遅延が発生する可能性があります。max_parallel_replicas
がレプリカ数より少ない場合、クエリの実行にはランダムなレプリカが選択されます。
タスクの奪取
もし一部のレプリカが他のレプリカよりタスクを処理するのが遅い場合、他のレプリカはそのレプリカに属するはずのタスクをハッシュで「奪う」ことを試みて、遅延を減少させます。
制限事項
この機能には既知の制限がありますが、その主要なものはこのセクションに記載されています。
もし以下に示した制限のいずれでもない問題が発生し、並列レプリカが原因と思われる場合は、comp-parallel-replicas
ラベルを使用してGitHubで問題をオープンしてください。
制限事項 | 説明 |
---|---|
複雑なクエリ | 現在、並列レプリカは単純なクエリにはかなりうまく機能します。CTE、サブクエリ、JOIN、非平坦クエリなどの複雑さがクエリ性能に悪影響を及ぼす可能性があります。 |
小規模なクエリ | 多くの行を処理しないクエリを実行する場合、複数のレプリカで実行すると、レプリカ間のコーディネーションのネットワーク時間がクエリ実行に追加のサイクルをもたらす可能性があるため、パフォーマンスが向上しない場合があります。これらの問題を制限するために、設定を使用することができます:parallel_replicas_min_number_of_rows_per_replica 。 |
FINALで並列レプリカは無効 | |
高いカーディナリティデータと複雑な集計 | 多くのデータを送信する必要がある高いカーディナリティの集計が、クエリを著しく遅くする可能性があります。 |
新しいアナライザーとの互換性 | 新しいアナライザーは、特定のシナリオでクエリ実行を大幅に遅くしたり、早くしたりする可能性があります。 |
並列レプリカに関連する設定
設定 | 説明 |
---|---|
enable_parallel_replicas | 0 : 無効1 : 有効 2 : 並列レプリカの使用を強制します。使用されない場合は例外を投げます。 |
cluster_for_parallel_replicas | 並列レプリケーションに使用するクラスタ名。ClickHouse Cloudを使用している場合は、default を使用します。 |
max_parallel_replicas | 複数のレプリカでクエリ実行に使用する最大レプリカ数。クラスター内のレプリカ数より少ない数が指定されている場合、ノードはランダムに選択されます。この値は、水平スケーリングを考慮してオーバーコミットされることもあります。 |
parallel_replicas_min_number_of_rows_per_replica | 処理する必要がある行数に基づいて使用されるレプリカ数を制限します。使用されるレプリカの数は、次のように定義されます:推定読み取り行数 / 最小行数(レプリカあたり) 。 |
allow_experimental_analyzer | 0 : 古いアナライザーを使用1 : 新しいアナライザーを使用します。並列レプリカの動作は使用するアナライザーによって変わる可能性があります。 |
並列レプリカの問題調査
各クエリに使用されている設定を確認するには、system.query_log
テーブルを使用できます。また、system.events
テーブルを見ることで、サーバー上で発生したすべてのイベントを確認できます。さらに、clusterAllReplicas
テーブル関数を使用して、すべてのレプリカ上のテーブルを確認できます(クラウドユーザーの場合は、default
を使用します)。