跳到主要内容
跳到主要内容

并行副本

Beta feature. Learn more.

介绍

ClickHouse 可以非常快速地处理查询,但这些查询是如何在多个服务器之间分配和并行化的呢?

在本指南中,我们将首先讨论 ClickHouse 如何通过分布式表将查询分配到多个分片上,然后讨论查询如何利用多个副本来执行。

分片架构

在无共享架构中,集群通常被拆分为多个分片,每个分片包含整体数据的一个子集。一个分布式表位于这些分片之上,为完整数据提供统一视图。

读取请求可以发送到本地表。查询执行将仅在指定的分片上发生,或者可以将其发送到分布式表,在这种情况下,每个分片将执行给定的查询。发出查询的服务器将聚合数据并响应客户端:

sharded architecture

上图可视化了客户查询分布式表时发生的情况:

  1. 选择查询通过负载均衡器以任意方式发送到节点上的分布式表(通过轮询策略或在被指向特定服务器后)。该节点将充当协调者。

  2. 节点将通过分布式表指定的信息找到需要执行查询的每个分片,并将查询发送到每个分片。

  3. 每个分片本地读取、过滤和聚合数据,然后将可合并的状态发送回协调者。

  4. 协调节点合并数据,然后将响应发送回客户端。

当我们将副本引入混合时,过程相似,唯一的区别是每个分片仅有一个副本会执行查询。这意味着可以并行处理更多查询。

非分片架构

ClickHouse Cloud 具有与上述不同的架构。(有关更多细节,请参见 "ClickHouse Cloud Architecture")。通过计算与存储的分离,以及几乎无限的存储,分片的需求变得不那么重要。

下图展示了 ClickHouse Cloud 架构:

non sharded architecture

这种架构使我们能够几乎瞬时地添加和移除副本,从而确保非常高的集群可扩展性。ClickHouse Keeper 集群(如右所示)确保我们拥有元数据的单一真实性来源。副本可以从 ClickHouse Keeper 集群获取元数据,并且所有副本维护相同的数据。数据本身存储在对象存储中,SSD 缓存使查询速度得以提升。

但我们如何在多个服务器之间分配查询执行呢?在分片架构中很明显,因为每个分片实际上可以在数据的子集上执行查询。当没有分片时,这会如何工作呢?

引入并行副本

为了通过多个服务器并行化查询执行,我们首先需要能够将我们的一个服务器指定为协调者。协调者负责创建需要执行的任务列表,确保所有任务都被执行、聚合并将结果返回给客户端。与大多数分布式系统一样,这将是接收初始查询的节点的角色。我们还需要定义工作单元。在分片架构中,工作单元是分片,即数据的子集。使用并行副本时,我们将使用表的一个小部分,称为 granules,作为工作单元。

现在,让我们看看实际操作中的工作流程,借助下图:

Parallel replicas

使用并行副本时:

  1. 客户端的查询在经过负载均衡器后发送到一个节点。该节点成为此查询的协调者。

  2. 节点分析每个部分的索引,并选择正确的部分和 granules 进行处理。

  3. 协调者将工作负载拆分为可以分配给不同副本的一组 granules。

  4. 每组 granules 由相应的副本处理,完成时将可合并的状态发送给协调者。

  5. 最后,协调者合并来自副本的所有结果,然后将响应返回给客户端。

上述步骤概述了并行副本在理论上的工作原理。然而,在实际中,有许多因素可能会阻止这种逻辑完美运行:

  1. 一些副本可能不可用。

  2. ClickHouse 中的复制是异步的,一些副本在某些时间点可能没有相同的部分。

  3. 需要以某种方式处理副本之间的尾延迟。

  4. 文件系统缓存因每个副本的活动而异,这意味着随机的任务分配可能导致由于缓存局部性而性能不佳。

我们将在接下来的部分中探讨如何克服这些因素。

公告

为了解决上述列表中的(1)和(2),我们引入了公告的概念。让我们通过下图来可视化它的工作原理:

Announcements
  1. 客户端的查询在经过负载均衡器后发送到一个节点。该节点成为此查询的协调者。

  2. 协调节点向集群中的所有副本发送请求以获取公告。副本可能对表的当前部分集有稍微不同的视图。因此,我们需要收集这些信息,以避免不正确的调度决策。

  3. 协调节点然后使用公告来定义可以分配给不同副本的一组 granules。在这里,例如,我们可以看到由于副本 2 在其公告中没有提供这个部分,因此没有来自部分 3 的 granules 被分配给副本 2。同时注意,由于副本没有提供公告,没有任务被分配给副本 3。

  4. 每个副本处理完其 granules 子集的查询后,将可合并的状态发送回协调者,协调者合并结果,响应发送给客户端。

动态协调

为了解决尾延迟的问题,我们添加了动态协调。这意味着所有 granules 不会在一次请求中发送给副本,而是每个副本将能够向协调者请求新的任务(要处理的一组 granules)。协调者将根据接收到的公告为副本提供一组 granules。

假设我们处于处理过程的阶段,此时所有副本都已发送包含所有部分的公告。

下图可视化了动态协调的工作方式:

Dynamic Coordination - part 1
  1. 副本向协调节点表明它们能够处理任务,同时它们还可以指定自己能够处理多少工作。

  2. 协调者将任务分配给副本。

Dynamic Coordination - part 2
  1. 副本 1 和 2 能够非常快地完成其任务。它们将向协调节点请求另一个任务。

  2. 协调者将新任务分配给副本 1 和 2。

Dynamic Coordination - part 3
  1. 所有副本现在都完成了对其任务的处理。它们请求更多任务。

  2. 协调者根据公告检查剩余需要处理的任务,但没有剩余任务。

  3. 协调者告诉副本一切都已处理。它将合并所有可合并的状态并对查询做出响应。

管理缓存局部性

最后一个可能存在的问题是我们如何处理缓存局部性。如果查询多次执行,我们如何确保相同的任务路由到相同的副本?在前面的例子中,我们有以下任务分配:

副本 1副本 2副本 3
部分 1g1, g6, g7g2, g4, g5g3
部分 2g1g2, g4, g5g3
部分 3g1, g6g2, g4, g5g3

为了确保相同的任务被分配给相同的副本并能够受益于缓存,会发生两件事。部分 + granules 集合(即一个任务)的哈希会被计算。然后应用一个模数来决定任务分配的副本数量。

在纸面上,这听起来不错,但在实际中,如果某个副本突然负载过重,或网络出现降级,则可能导致尾延迟,因为相同的副本持续用于执行某些任务。如果 max_parallel_replicas 小于副本的数量,则会随机选择副本进行查询执行。

任务窃取

如果某个副本处理任务的速度比其他副本慢,其他副本将尝试通过哈希“窃取”原则上属于该副本的任务,以减少尾延迟。

限制

此功能已知有一些限制,其中主要限制在本节中进行了文档说明。

备注

如果您发现一个问题,并不属于下面列出的限制,并怀疑并行副本可能是原因,请在 GitHub 上打开一个问题,使用标签 comp-parallel-replicas

限制描述
复杂查询当前并行副本对于简单查询功能良好。CTE、子查询、JOIN、非平铺查询等复杂性层将对查询性能产生负面影响。
小查询如果您执行的查询处理的行数不多,在多个副本上执行可能不会提高性能,因为副本之间的协调网络时间可能导致查询执行中额外的周期。通过使用设置 parallel_replicas_min_number_of_rows_per_replica 可以减少这些问题的影响。
FINAL 时禁用并行副本
投影与并行副本不一起使用
高基数数据和复杂聚合高基数聚合需要发送大量数据,会显著减慢查询速度。
与新分析器的兼容性在特定情况下,新的分析器可能会显著减慢或加快查询执行。
设置描述
enable_parallel_replicas0: 禁用
1: 启用
2: 强制使用并行副本,如果不使用将抛出异常。
cluster_for_parallel_replicas用于并行复制的集群名称;如果您使用 ClickHouse Cloud,请使用 default
max_parallel_replicas用于在多个副本上执行查询的最大副本数,如果指定的数量低于集群中的副本数,则将随机选择节点。该值也可以被超分配以考虑水平扩展。
parallel_replicas_min_number_of_rows_per_replica帮助限制基于需要处理的行数的副本数,使用的副本数由以下公式定义:
estimated rows to read / min_number_of_rows_per_replica
allow_experimental_analyzer0: 使用旧分析器
1: 使用新分析器。

并行副本的行为可能会根据使用的分析器而有所不同。

调查并行副本问题

您可以通过 system.query_log 表检查每个查询使用的设置。您还可以查看 system.events 表,以查看在服务器上发生的所有事件,您可以使用 clusterAllReplicas 表函数查看所有副本上的表(如果您是云用户,请使用 default)。

SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
响应
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘

system.text_log 表还包含有关使用并行副本执行查询的信息:

SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
响应
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica
                                                                                                   │
│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica
                                                                                                   │
│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica
                                                                                                   │
│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica
                                                                                                   │
│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica
                                                                                                   │
│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica
                                                                                                   │
│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

最后,您还可以使用 EXPLAIN PIPELINE。这将突出显示 ClickHouse 将如何执行查询以及用于执行查询的资源。以以下查询为例:

SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10

让我们来看一下没有并行副本的查询管道:

EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;
EXPLAIN without parallel replica

现在有并行副本:

EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;
EXPLAIN with parallel replica