メインコンテンツまでスキップ
メインコンテンツまでスキップ

並列レプリカ

Beta feature. Learn more.

Introduction

ClickHouseはクエリを非常に迅速に処理しますが、これらのクエリはどのように複数のサーバーに分散および並列処理されるのでしょうか?

このガイドでは、まずClickHouseがどのように分散テーブルを介してクエリを複数のシャードに分配するかを説明し、その後、クエリがどのように複数のレプリカを活用して実行されるかについて説明します。

Sharded architecture

共有無用アーキテクチャでは、クラスターは通常、複数のシャードに分割され、各シャードは全体のデータのサブセットを含んでいます。分散テーブルはこれらのシャードの上に位置し、完全なデータの統一ビューを提供します。

読み込みはローカルテーブルに送信できます。クエリの実行は指定されたシャードのみで発生するか、分散テーブルに送信され、その場合、各シャードが指定されたクエリを実行します。分散テーブルがクエリされたサーバーはデータを集約し、クライアントに応答します:

sharded archtiecture

上の図は、クライアントが分散テーブルをクエリしたときに何が起こるかを可視化しています:

  1. SELECTクエリは、ノード上の分散テーブルにランダムに送信されます (ラウンドロビン戦略を介して、または負荷分散装置によって特定のサーバーにルーティングされた後)。このノードは、コーディネーターとして機能します。

  2. ノードは、分散テーブルによって指定された情報を介してクエリを実行する必要がある各シャードを特定し、クエリを各シャードに送信します。

  3. 各シャードはローカルでデータを読み取り、フィルタリングし、集約し、マージ可能な状態をコーディネーターに送信します。

  4. コーディネーターはデータをマージし、クライアントに応答を返します。

レプリカを組み込むと、プロセスは非常に似ています。唯一の違いは、各シャードからの単一のレプリカのみがクエリを実行することです。これにより、より多くのクエリを並行して処理できるようになります。

Non-sharded architecture

ClickHouse Cloudは、上記のアーキテクチャとは非常に異なるアーキテクチャを持っています。 (詳しくは"ClickHouse Cloud Architecture"を参照してください)。計算とストレージが分離され、事実上無限のストレージがあるため、シャードの必要性はそれほど重要ではなくなります。

以下の図はClickHouse Cloudのアーキテクチャを示しています:

non sharded architecture

このアーキテクチャでは、レプリカをほぼ瞬時に追加および削除できるため、非常に高いクラスターのスケーラビリティを確保できます。ClickHouse Keeperクラスター(右側に示す)は、メタデータの単一の真実のソースを持つことを保証します。レプリカはClickHouse Keeperクラスターからメタデータを取得し、すべては同じデータを維持します。データ自体はオブジェクトストレージに格納されており、SSDキャッシュによりクエリを高速化することができます。

では、クエリの実行をどのように複数のサーバーに分散できるのでしょうか?シャードアーキテクチャでは、各シャードが実際にデータのサブセットに対してクエリを実行できるため、それはかなり明らかでした。シャーディングがない場合、どうすればよいのでしょうか?

Introducing parallel replicas

複数のサーバーでのクエリ実行を並列化するには、まずサーバーの1つをコーディネーターとして割り当てる必要があります。コーディネーターは実行する必要のあるタスクのリストを作成し、それらがすべて実行され、集約され、結果がクライアントに返されることを保証します。ほとんどの分散システムと同様に、初期クエリを受信するノードがこの役割を担います。また、作業の単位を定義する必要があります。シャードアーキテクチャでは、作業の単位はシャードであり、データのサブセットです。並列レプリカを使用すると、グラニュールと呼ばれるテーブルの小さな部分を作業の単位として使用します。

では、下の図を使って、実際にどのように機能するか見てみましょう:

Parallel replicas

並列レプリカを使用すると:

  1. クライアントからのクエリは負荷分散装置を通過した後、1つのノードに送信されます。このノードはこのクエリのコーディネーターになります。

  2. ノードは各パーツのインデックスを分析し、処理するための正しいパーツとグラニュールを選択します。

  3. コーディネーターは作業負荷を、異なるレプリカに割り当てることができるグラニュールのセットに分割します。

  4. 各グラニュールのセットは対応するレプリカによって処理され、終了するとマージ可能な状態がコーディネーターに送信されます。

  5. 最後に、コーディネーターはすべてのレプリカから結果をマージし、クライアントに応答を返します。

上記のステップは、理論上の並列レプリカの動作を概説しています。しかし、実際には、その論理が完璧に機能するのを妨げる多くの要因があります:

  1. 一部のレプリカは利用できない場合があります。

  2. ClickHouseのレプリケーションは非同期であるため、一部のレプリカは時間的に異なるパーツを持つ可能性があります。

  3. レプリカ間の遅延ラテシーを何らかの方法で処理する必要があります。

  4. ファイルシステムキャッシュはレプリカごとにアクティビティに応じて異なるため、ランダムなタスク割り当てはキャッシュの局所性に基づいて最適性能を妨げる可能性があります。

これらの要因に対処する方法を次のセクションで探ります。

Announcements

上記のリストの(1)および(2)に対処するために、アナウンスメントの概念を導入しました。以下の図を使って、どのように機能するかを可視化しましょう:

Announcements
  1. クライアントからのクエリは負荷分散装置を通過した後、1つのノードに送信されます。このノードはこのクエリのコーディネーターになります。

  2. コーディネーターのノードは、クラスター内のすべてのレプリカからアナウンスメントを取得するリクエストを送信します。レプリカは、テーブルの現在のパーツセットに対してわずかに異なるビューを持つ可能性があります。そのため、不正確なスケジューリング決定を避けるために、この情報を収集する必要があります。

  3. コーディネーターのノードは、アナウンスメントを使用して、さまざまなレプリカに割り当てることができるグラニュールのセットを定義します。ここでは、パート3のグラニュールはレプリカ2に割り当てられていないことがわかります。なぜなら、このレプリカはそのアナウンスメントにこのパートを提供しなかったからです。また、レプリカ3にはアナウンスメントが提供されなかったため、タスクは割り当てられていません。

  4. 各レプリカは、自身のグラニュールサブセットでクエリを処理し、マージ可能な状態をコーディネーターに送信した後、コーディネーターは結果をマージし、応答がクライアントに送信されます。

Dynamic coordination

尾の遅延の問題に対処するために、動的コーディネーションを追加しました。これは、すべてのグラニュールを1回のリクエストでレプリカに送信するのではなく、各レプリカがコーディネーターに新しいタスク(処理するグラニュールのセット)をリクエストできることを意味します。コーディネーターは、受け取ったアナウンスメントに基づいてレプリカにグラニュールのセットを割り当てます。

すべてのレプリカがすべてのパーツを含むアナウンスメントを送信した段階であると仮定しましょう。

以下の図は、動的コーディネーションがどのように機能するかを可視化しています:

Dynamic Coordination - part 1
  1. レプリカはコーディネーターノードにタスクを処理できることを知らせ、処理できる作業量を指定することもできます。

  2. コーディネーターはレプリカにタスクを割り当てます。

Dynamic Coordination - part 2
  1. レプリカ1と2はタスクを非常に速く終了できます。それらはコーディネーターノードに別のタスクをリクエストします。

  2. コーディネーターはレプリカ1と2に新しいタスクを割り当てます。

Dynamic Coordination - part 3
  1. すべてのレプリカはタスクの処理を終了しました。彼らはさらにタスクを要求します。

  2. コーディネーターはアナウンスメントを使用して、処理される残りのタスクをチェックしますが、残っているタスクはありません。

  3. コーディネーターは、すべてが処理されたことをレプリカに通知します。現在、すべてのマージ可能な状態をマージし、クエリに応答します。

Managing cache locality

最後に残された潜在的な問題は、キャッシュの局所性をどのように処理するかです。クエリが複数回実行される場合、同じタスクが同じレプリカにルーティングされることをどのように保証できますか?前の例では、次のタスクが割り当てられました:

レプリカ1レプリカ2レプリカ3
パート1g1, g6, g7g2, g4, g5g3
パート2g1g2, g4, g5g3
パート3g1, g6g2, g4, g5g3

同じタスクが同じレプリカに割り当てられてキャッシュの恩恵を受けることを保証するために、2つのことが起こります。パート+グラニュールのセット(タスク)のハッシュが計算されます。タスク割り当てにはレプリカの数でモジュロが適用されます。

紙の上ではこれはうまく見えるかもしれませんが、実際には、一つのレプリカへの突然の負荷やネットワークの劣化が、特定のタスクを実行するために同じレプリカが一貫して使用される場合に遅延ラテシーを導入する可能性があります。max_parallel_replicasがレプリカの数より少ない場合、クエリ実行のためにランダムなレプリカが選択されます。

Task stealing

あるレプリカが他よりもタスクを遅く処理している場合、他のレプリカはそのレプリカに本来属するべきタスクをハッシュで「盗もう」として、尾の遅延を減少させます。

Limitations

この機能には知られた制限がありますが、その主なものはこのセクションに文書化されています。

注記

以下に示す制限のいずれでもない問題を見つけ、並列レプリカが原因であると疑う場合は、comp-parallel-replicasラベルを使用してGitHubに問題を開いてください。

制限説明
複雑なクエリ現在、並列レプリカは単純なクエリに対してかなり良好に機能します。CTE、サブクエリ、JOIN、非平坦クエリなどの複雑さの層は、クエリパフォーマンスに悪影響を与える可能性があります。
小さなクエリ多くの行を処理しないクエリを実行している場合、複数のレプリカで実行してもパフォーマンスが向上しない可能性があります。これは、レプリカ間の調整のためのネットワーク時間が追加のサイクルをクエリ実行にもたらす可能性があるからです。これらの問題を制限するには、設定を使用します: parallel_replicas_min_number_of_rows_per_replica
FINALでの並列レプリカの無効化
プロジェクションと並列レプリカの同時使用は不可
高いカーディナリティデータと複雑な集約多くのデータを送信する必要がある高カーディナリティ集約は、クエリを著しく遅くする可能性があります。
新しいアナライザーとの互換性新しいアナライザーは、特定のシナリオでクエリ実行を著しく遅くしたり、逆に高速化したりする可能性があります。
設定説明
enable_parallel_replicas0: 無効
1: 有効
2: 並列レプリカの使用を強制し、使用されなければ例外をスローします。
cluster_for_parallel_replicas並列レプリケーションに使用するクラスタ名; ClickHouse Cloudを使用している場合は、defaultを使用してください。
max_parallel_replicas複数のレプリカでのクエリ実行に使用する最大レプリカ数。一部のレプリカの数より少ない数が指定されると、ノードはランダムに選択されます。この値も水平方向のスケーリングに対応できるように過剰な値を書くことができます。
parallel_replicas_min_number_of_rows_per_replica処理する必要のある行数に基づいて使用されるレプリカの数を制限するのに役立ちます。使用されるレプリカの数は次のように定義されます:
estimated rows to read / min_number_of_rows_per_replica
allow_experimental_analyzer0: 古いアナライザーを使用
1: 新しいアナライザーを使用します。

並列レプリカの動作は使用されるアナライザーに基づいて変わる可能性があります。

Investigating issues with parallel replicas

system.query_logテーブルで、各クエリに使用されている設定を確認できます。また、system.eventsテーブルを確認してサーバーで発生したすべてのイベントを確認でき、clusterAllReplicasテーブル関数を使用してすべてのレプリカ上のテーブルを確認できます (クラウドユーザーの場合はdefaultを使用します)。

SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Response
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘

system.text_logテーブルには、並列レプリカを使用したクエリの実行に関する情報も含まれています:

SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Response
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica
                                                                                                   │
│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica
                                                                                                   │
│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica
                                                                                                   │
│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica
                                                                                                   │
│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica
                                                                                                   │
│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica
                                                                                                   │
│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

最後に、EXPLAIN PIPELINEも使用できます。これにより、ClickHouseがどのようにクエリを実行し、クエリの実行にどのリソースが必要かを強調表示します。以下のクエリを例としてみましょう:

SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10

並列レプリカなしのクエリパイプラインを見てみましょう:

EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;
EXPLAIN without parallel_replica

そして今、並列レプリカありでは:

EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;
EXPLAIN with parallel_replica