パラレルレプリカ
はじめに
ClickHouse はクエリを非常に迅速に処理しますが、これらのクエリはどのように複数のサーバーに分配され、並列化されるのでしょうか?
このガイドでは、まず ClickHouse が分散テーブルを介してクエリを複数のシャードに分配する方法について説明し、その後クエリが複数のレプリカを活用して実行される方法について説明します。
シャードアーキテクチャ
過去のないアーキテクチャでは、クラスターは通常、複数のシャードに分割され、それぞれのシャードには全体データのサブセットが含まれています。分散テーブルはこれらのシャードの上にあり、完全なデータの統一ビューを提供します。
読み取りはローカルテーブルに送信できます。クエリの実行は指定されたシャードのみで発生するか、分散テーブルに送信され、その場合は各シャードが指定されたクエリを実行します。分散テーブルがクエリされたサーバーはデータを集約し、クライアントに応答します:

上図は、クライアントが分散テーブルをクエリしたときに何が起こるかを視覚化しています:
SELECT クエリは、ノードへの分散テーブルに対して任意に送信されます (ラウンドロビン戦略を介して、またはロードバランサーによって特定のサーバーにルーティングされた後)。このノードは今やコーディネーターとして機能します。
ノードは、分散テーブルによって指定された情報を介して、クエリを実行する必要がある各シャードを特定し、クエリは各シャードに送信されます。
各シャードはローカルでデータを読み込み、フィルタリングして集約し、その後、マージ可能な状態をコーディネーターに返します。
コーディネーターのノードはデータをマージし、結果をクライアントに返します。
レプリカを追加すると、プロセスは非常に似ていますが、異なるのは、各シャードからわずか1つのレプリカのみがクエリを実行するということです。これにより、さらに多くのクエリを並列処理できるようになります。
非シャードアーキテクチャ
ClickHouse Cloud は、上記に示されたアーキテクチャとは非常に異なるアーキテクチャを持っています。 (詳細については、"ClickHouse Cloud アーキテクチャ"を参照してください)。計算とストレージの分離、および実質的に無限のストレージを持つため、シャードの必要性はそれほど重要ではなくなります。
下図は ClickHouse Cloud アーキテクチャを示しています:

このアーキテクチャにより、レプリカをほぼ瞬時に追加および削除でき、非常に高いクラスターのスケーラビリティを確保できます。ClickHouse Keeper クラスター(右側に示されている)は、メタデータの単一の真実のソースを確保します。レプリカは ClickHouse Keeper クラスターからメタデータを取得し、すべてが同じデータを維持します。データ自体はオブジェクトストレージに保存されており、SSD キャッシュによりクエリの速度を向上させることができます。
しかし、クエリの実行をどのように複数のサーバーに分配できますか? シャードアーキテクチャでは、各シャードが実際にデータのサブセットに対してクエリを実行できるため、その点は明確でした。では、シャーディングがない場合、それはどのように機能しますか?
パラレルレプリカの導入
複数のサーバーでクエリ実行を並列化するには、まずサーバーの1つをコーディネーターとして割り当てる必要があります。コーディネーターは、実行すべきタスクのリストを作成し、それらがすべて実行され、集約され、結果がクライアントに返されることを確認します。ほとんどの分散システムと同様に、これは最初のクエリを受信したノードの役割となります。また、作業単位を定義する必要もあります。シャードアーキテクチャでは、作業単位はシャード、つまりデータのサブセットです。パラレルレプリカでは、作業単位としてテーブルの小さな部分である グラニュール を使用します。
では、以下の図を参考にして実際にどのように機能するか見てみましょう:

パラレルレプリカでは:
クライアントからのクエリは、ロードバランサーを通過した後、1つのノードに送信されます。このノードはこのクエリのコーディネーターになります。
ノードは各パーツのインデックスを分析し、処理するための正しいパーツとグラニュールを選択します。
コーディネーターは、異なるレプリカに割り当てることができるグラニュールのセットに作業負荷を分割します。
各セットのグラニュールは対応するレプリカによって処理され、処理が完了したらマージ可能な状態がコーディネーターに送信されます。
最後に、コーディネーターはすべてのレプリカからの結果をマージし、その後クライアントに応答を返します。
上記のステップは、パラレルレプリカが理論的にどのように機能するかを示しています。 しかし、実際には、完璧に機能することを妨げる多くの要因があります:
一部のレプリカが利用できない場合があります。
ClickHouseではレプリケーションは非同期であり、一部のレプリカは同時点において同じパーツを持たないことがあります。
レプリカ間の遅れが発生する可能性があります。
ファイルシステムキャッシュはレプリカごとに異なり、各レプリカのアクティビティに基づいて変動するため、ランダムなタスク割り当てがキャッシュのローカリティを考慮すると最適ではない性能を引き起こすことがあります。
これらの要因を克服する方法については、次のセクションで探っていきます。
アナウンスメント
上のリストの(1)と(2)に対処するために、アナウンスメントの概念を導入しました。以下の図を用いてその仕組みを可視化してみましょう:

クライアントからのクエリは、ロードバランサーを通過した後、1つのノードに送信されます。このノードはこのクエリのコーディネーターになります。
コーディネーターのノードは、クラスター内のすべてのレプリカからアナウンスメントを取得するリクエストを送信します。レプリカはテーブルの現在のパーツのセットについてわずかに異なるビューを持つことがあります。その結果、正しくスケジューリングされるためには、この情報を収集する必要があります。
コーディネーターのノードは、その後アナウンスメントを使用して、異なるレプリカに割り当てることができるグラニュールのセットを定義します。ここでは、レプリカ 2 がアナウンスメントでこのパーツを提供しなかったため、パーツ 3 のグラニュールは割り当てられていないことがわかります。また、レプリカ 3 にはアナウンスメントがなかったため、タスクも割り当てられませんでした。
各レプリカがそれぞれのグラニュールのサブセットでクエリを処理し、マージ可能な状態がコーディネーターに送信されたら、コーディネーターは結果をマージし、応答をクライアントに送信します。
動的コーディネーション
遅延の問題に対処するために、動的コーディネーションを追加しました。これは、すべてのグラニュールが1つのリクエストでレプリカに送信されるのではなく、各レプリカがコーディネーターに新しいタスク(処理するグラニュールのセット)をリクエストできることを意味します。コーディネーターは、受信したアナウンスメントに基づいてレプリカにグラニュールのセットを提供します。
プロセスの段階において、すべてのレプリカがすべてのパーツを持ったアナウンスメントを送信したと仮定しましょう。
以下の図は、動的コーディネーションがどのように機能するかを視覚化しています:

レプリカはコーディネーターのノードに対してタスクを処理できることを知らせ、自分たちが処理できる作業量を指定することもできます。
コーディネーターはレプリカにタスクを割り当てます。

レプリカ 1 と 2 は非常に迅速にタスクを完了できます。彼らはコーディネーターのノードから別のタスクをリクエストします。
コーディネーターはレプリカ 1 と 2 に新しいタスクを割り当てます。

すべてのレプリカが自分のタスクの処理を完了しました。彼らはさらにタスクをリクエストします。
コーディネーターは、アナウンスメントを使用して処理する必要があるタスクを確認しますが、残っているタスクはありません。
コーディネーターはレプリカにすべての処理が完了したことを伝えます。コーディネーターはすべてのマージ可能な状態をマージし、クエリに応答します。
キャッシュローカリティの管理
最後に残る可能性のある問題は、キャッシュローカリティをどのように処理するかです。クエリが何度も実行されると、同じタスクが同じレプリカにルーティングされることをどのように確保できるでしょうか? 前の例では、次のタスクが割り当てられていました:
レプリカ 1 | レプリカ 2 | レプリカ 3 | |
---|---|---|---|
パーツ 1 | g1, g6, g7 | g2, g4, g5 | g3 |
パーツ 2 | g1 | g2, g4, g5 | g3 |
パーツ 3 | g1, g6 | g2, g4, g5 | g3 |
同じタスクが同じレプリカに割り当てられ、キャッシュの恩恵を受けられるようにするため、2つのことが行われます。パーツ + グラニュールの (タスク) ハッシュが計算され、タスク割り当てに対してレプリカの数のモジュロが適用されます。
理論的にはこれは良いように思えますが、実際には、一部のレプリカに対する突然の負荷やネットワークの劣化により、特定のタスクの実行に同じレプリカが一貫して使用される場合、遅延が発生する可能性があります。 max_parallel_replicas
がレプリカの数よりも少ない場合、クエリの実行にはランダムなレプリカが選ばれます。
タスクの泥棒
一部のレプリカが他のレプリカよりもタスクを遅く処理する場合、他のレプリカはそのレプリカに属するタスクをハッシュで「盗む」ことを試み、遅延を減らします。
制限事項
この機能には既知の制限があり、主な制限事項はこのセクションに記載されています。
下記の制限事項のいずれでもない問題を見つけ、パラレルレプリカが原因と疑う場合は、comp-parallel-replicas
ラベルを使用して GitHub に問題をオープンしてください。
制限事項 | 説明 |
---|---|
複雑なクエリ | 現在、パラレルレプリカは単純なクエリに対しては比較的うまく機能します。CTE、サブクエリ、JOIN、非フラットクエリなどの複雑さの層は、クエリパフォーマンスに悪影響を及ぼす可能性があります。 |
小さなクエリ | 処理する行数が少ないクエリを実行している場合、複数のレプリカで実行してもパフォーマンスが向上しない可能性があります。これは、レプリカ間の調整のためのネットワーク時間がクエリの実行に追加のサイクルをもたらす可能性があるためです。これらの問題を制限するには、設定で parallel_replicas_min_number_of_rows_per_replica を使用できます。 |
FINAL ではパラレルレプリカが無効 | |
高いカーディナリティデータと複雑な集約 | 高カルディナリティの集計では、多くのデータを送信する必要があり、クエリを著しく遅くする可能性があります。 |
新しいアナライザーとの互換性 | 新しいアナライザーは特定のシナリオでクエリ実行を著しく遅くしたり速くしたりする可能性があります。 |
パラレルレプリカに関連する設定
設定 | 説明 |
---|---|
enable_parallel_replicas | 0 : 無効1 : 有効 2 : パラレルレプリカの使用を強制し、使用されない場合は例外をスローします。 |
cluster_for_parallel_replicas | パラレルレプリケーションに使用するクラスター名。ClickHouse Cloud を使用している場合は、default を使用してください。 |
max_parallel_replicas | 複数のレプリカでのクエリ実行に使用する最大レプリカ数。クラスター内のレプリカ数より小さい値が指定された場合、ノードはランダムに選択されます。この値は、水平スケーリングを考慮するために過剰に設定することもできます。 |
parallel_replicas_min_number_of_rows_per_replica | 処理する必要がある行数に基づいて使用されるレプリカの数を制限するのに役立ちます。使用されるレプリカの数は、次のように定義されます:estimated rows to read / min_number_of_rows_per_replica 。 |
allow_experimental_analyzer | 0 : 古いアナライザーを使用1 : 新しいアナライザーを使用。パラレルレプリカの動作は、使用されるアナライザーに基づいて変化する可能性があります。 |
パラレルレプリカに関する問題の調査
各クエリで使用されている設定は、system.query_log
テーブルで確認できます。また、system.events
テーブルを参照することで、サーバー上で発生したすべてのイベントを確認することが可能です。さらに、clusterAllReplicas
テーブル関数を使って、すべてのレプリカ上のテーブルを確認できます(クラウドユーザーの場合は、default
を使用してください)。
レスポンス
system.text_log
テーブルには、パラレルレプリカを使用したクエリの実行に関する情報も含まれています:
レスポンス
最後に、EXPLAIN PIPELINE
を使用することもできます。これにより、ClickHouse がクエリをどのように実行するか、実行に使用されるリソースが強調表示されます。次のクエリを例として見てみましょう:
パラレルレプリカなしのクエリパイプラインを見てみましょう:

続いて、パラレルレプリカのある場合は以下のようになります:
