并行副本
介绍
ClickHouse 可以非常快速地处理查询,但这些查询是如何在多个服务器之间分配和并行化的呢?
在本指南中,我们将首先讨论 ClickHouse 如何通过分布式表将查询分配到多个分片上,然后讨论查询如何利用多个副本来执行。
分片架构
在无共享架构中,集群通常被拆分为多个分片,每个分片包含整体数据的一个子集。一个分布式表位于这些分片之上,为完整数据提供统一视图。
读取请求可以发送到本地表。查询执行将仅在指定的分片上发生,或者可以将其发送到分布式表,在这种情况下,每个分片将执行给定的查询。发出查询的服务器将聚合数据并响应客户端:

上图可视化了客户查询分布式表时发生的情况:
选择查询通过负载均衡器以任意方式发送到节点上的分布式表(通过轮询策略或在被指向特定服务器后)。该节点将充当协调者。
节点将通过分布式表指定的信息找到需要执行查询的每个分片,并将查询发送到每个分片。
每个分片本地读取、过滤和聚合数据,然后将可合并的状态发送回协调者。
协调节点合并数据,然后将响应发送回客户端。
当我们将副本引入混合时,过程相似,唯一的区别是每个分片仅有一个副本会执行查询。这意味着可以并行处理更多查询。
非分片架构
ClickHouse Cloud 具有与上述不同的架构。(有关更多细节,请参见 "ClickHouse Cloud Architecture")。通过计算与存储的分离,以及几乎无限的存储,分片的需求变得不那么重要。
下图展示了 ClickHouse Cloud 架构:

这种架构使我们能够几乎瞬时地添加和移除副本,从而确保非常高的集群可扩展性。ClickHouse Keeper 集群(如右所示)确保我们拥有元数据的单一真实性来源。副本可以从 ClickHouse Keeper 集群获取元数据,并且所有副本维护相同的数据。数据本身存储在对象存储中,SSD 缓存使查询速度得以提升。
但我们如何在多个服务器之间分配查询执行呢?在分片架构中很明显,因为每个分片实际上可以在数据的子集上执行查询。当没有分片时,这会如何工作呢?
引入并行副本
为了通过多个服务器并行化查询执行,我们首先需要能够将我们的一个服务器指定为协调者。协调者负责创建需要执行的任务列表,确保所有任务都被执行、聚合并将结果返回给客户端。与大多数分布式系统一样,这将是接收初始查询的节点的角色。我们还需要定义工作单元。在分片架构中,工作单元是分片,即数据的子集。使用并行副本时,我们将使用表的一个小部分,称为 granules,作为工作单元。
现在,让我们看看实际操作中的工作流程,借助下图:

使用并行副本时:
客户端的查询在经过负载均衡器后发送到一个节点。该节点成为此查询的协调者。
节点分析每个部分的索引,并选择正确的部分和 granules 进行处理。
协调者将工作负载拆分为可以分配给不同副本的一组 granules。
每组 granules 由相应的副本处理,完成时将可合并的状态发送给协调者。
最后,协调者合并来自副本的所有结果,然后将响应返回给客户端。
上述步骤概述了并行副本在理论上的工作原理。然而,在实际中,有许多因素可能会阻止这种逻辑完美运行:
一些副本可能不可用。
ClickHouse 中的复制是异步的,一些副本在某些时间点可能没有相同的部分。
需要以某种方式处理副本之间的尾延迟。
文件系统缓存因每个副本的活动而异,这意味着随机的任务分配可能导致由于缓存局部性而性能不佳。
我们将在接下来的部分中探讨如何克服这些因素。
公告
为了解决上述列表中的(1)和(2),我们引入了公告的概念。让我们通过下图来可视化它的工作原理:

客户端的查询在经过负载均衡器后发送到一个节点。该节点成为此查询的协调者。
协调节点向集群中的所有副本发送请求以获取公告。副本可能对表的当前部分集有稍微不同的视图。因此,我们需要收集这些信息,以避免不正确的调度决策。
协调节点然后使用公告来定义可以分配给不同副本的一组 granules。在这里,例如,我们可以看到由于副本 2 在其公告中没有提供这个部分,因此没有来自部分 3 的 granules 被分配给副本 2。同时注意,由于副本没有提供公告,没有任务被分配给副本 3。
每个副本处理完其 granules 子集的查询后,将可合并的状态发送回协调者,协调者合并结果,响应发送给客户端。
动态协调
为了解决尾延迟的问题,我们添加了动态协调。这意味着所有 granules 不会在一次请求中发送给副本,而是每个副本将能够向协调者请求新的任务(要处理的一组 granules)。协调者将根据接收到的公告为副本提供一组 granules。
假设我们处于处理过程的阶段,此时所有副本都已发送包含所有部分的公告。
下图可视化了动态协调的工作方式:

副本向协调节点表明它们能够处理任务,同时它们还可以指定自己能够处理多少工作。
协调者将任务分配给副本。

副本 1 和 2 能够非常快地完成其任务。它们将向协调节点请求另一个任务。
协调者将新任务分配给副本 1 和 2。

所有副本现在都完成了对其任务的处理。它们请求更多任务。
协调者根据公告检查剩余需要处理的任务,但没有剩余任务。
协调者告诉副本一切都已处理。它将合并所有可合并的状态并对查询做出响应。
管理缓存局部性
最后一个可能存在的问题是我们如何处理缓存局部性。如果查询多次执行,我们如何确保相同的任务路由到相同的副本?在前面的例子中,我们有以下任务分配:
副本 1 | 副本 2 | 副本 3 | |
---|---|---|---|
部分 1 | g1, g6, g7 | g2, g4, g5 | g3 |
部分 2 | g1 | g2, g4, g5 | g3 |
部分 3 | g1, g6 | g2, g4, g5 | g3 |
为了确保相同的任务被分配给相同的副本并能够受益于缓存,会发生两件事。部分 + granules 集合(即一个任务)的哈希会被计算。然后应用一个模数来决定任务分配的副本数量。
在纸面上,这听起来不错,但在实际中,如果某个副本突然负载过重,或网络出现降级,则可能导致尾延迟,因为相同的副本持续用于执行某些任务。如果 max_parallel_replicas
小于副本的数量,则会随机选择副本进行查询执行。
任务窃取
如果某个副本处理任务的速度比其他副本慢,其他副本将尝试通过哈希“窃取”原则上属于该副本的任务,以减少尾延迟。
限制
此功能已知有一些限制,其中主要限制在本节中进行了文档说明。
如果您发现一个问题,并不属于下面列出的限制,并怀疑并行副本可能是原因,请在 GitHub 上打开一个问题,使用标签 comp-parallel-replicas
。
限制 | 描述 |
---|---|
复杂查询 | 当前并行副本对于简单查询功能良好。CTE、子查询、JOIN、非平铺查询等复杂性层将对查询性能产生负面影响。 |
小查询 | 如果您执行的查询处理的行数不多,在多个副本上执行可能不会提高性能,因为副本之间的协调网络时间可能导致查询执行中额外的周期。通过使用设置 parallel_replicas_min_number_of_rows_per_replica 可以减少这些问题的影响。 |
FINAL 时禁用并行副本 | |
投影与并行副本不一起使用 | |
高基数数据和复杂聚合 | 高基数聚合需要发送大量数据,会显著减慢查询速度。 |
与新分析器的兼容性 | 在特定情况下,新的分析器可能会显著减慢或加快查询执行。 |
与并行副本相关的设置
设置 | 描述 |
---|---|
enable_parallel_replicas | 0 : 禁用1 : 启用2 : 强制使用并行副本,如果不使用将抛出异常。 |
cluster_for_parallel_replicas | 用于并行复制的集群名称;如果您使用 ClickHouse Cloud,请使用 default 。 |
max_parallel_replicas | 用于在多个副本上执行查询的最大副本数,如果指定的数量低于集群中的副本数,则将随机选择节点。该值也可以被超分配以考虑水平扩展。 |
parallel_replicas_min_number_of_rows_per_replica | 帮助限制基于需要处理的行数的副本数,使用的副本数由以下公式定义:estimated rows to read / min_number_of_rows_per_replica 。 |
allow_experimental_analyzer | 0 : 使用旧分析器1 : 使用新分析器。并行副本的行为可能会根据使用的分析器而有所不同。 |
调查并行副本问题
您可以通过 system.query_log
表检查每个查询使用的设置。您还可以查看 system.events
表,以查看在服务器上发生的所有事件,您可以使用 clusterAllReplicas
表函数查看所有副本上的表(如果您是云用户,请使用 default
)。
响应
system.text_log
表还包含有关使用并行副本执行查询的信息:
响应
最后,您还可以使用 EXPLAIN PIPELINE
。这将突出显示 ClickHouse 将如何执行查询以及用于执行查询的资源。以以下查询为例:
让我们来看一下没有并行副本的查询管道:

现在有并行副本:
