并行副本
简介
ClickHouse 处理查询的速度极快,但这些查询是如何在多台服务器之间被分发并并行执行的呢?
在本指南中,我们将首先介绍 ClickHouse 如何通过分布式表在多个分片之间分发查询,然后说明查询如何利用多个副本来完成执行。
分片架构
在无共享(shared-nothing)架构中,集群通常被拆分为多个分片(shard),每个分片包含整体数据的一个子集。一个分布式表位于这些分片之上,为完整数据提供统一视图。
读取请求可以发送到本地表,此时查询只会在指定分片上执行;也可以发送到分布式表,此时每个分片都会执行该查询。发起对分布式表查询的服务器会聚合数据并将结果返回给客户端:

上图展示了客户端查询分布式表时发生的过程:
SELECT 查询被任意发送到某个节点上的分布式表 (通过轮询策略,或在经过负载均衡器路由到特定服务器之后)。 该节点随后将作为协调器。
节点会根据分布式表中指定的信息,定位需要执行查询的每个分片, 并将查询发送给各个分片。
每个分片在本地读取、过滤并聚合数据,然后将可合并的中间状态 返回给协调器。
协调器节点对数据进行合并,然后将响应返回给客户端。
当我们引入副本时,流程基本相同,唯一的区别是每个分片中只有一个副本会执行查询, 从而能够并行处理更多查询请求。
非分片架构
ClickHouse Cloud 的架构与上文所介绍的架构有很大差异。 (参见 "ClickHouse Cloud Architecture" 了解更多详情。)由于计算与存储的分离,以及几乎无限的存储容量,对分片的需求就不那么重要了。
下图展示了 ClickHouse Cloud 的架构:

这种架构使我们几乎可以瞬时添加和移除副本,从而确保集群具有极高的可扩展性。右侧所示的 ClickHouse Keeper 集群确保我们拥有元数据的唯一可信来源。各个副本可以从 ClickHouse Keeper 集群获取元数据,并都维护相同的数据。数据本身存储在对象存储中,而 SSD 缓存则可以加速查询。
但是,现在我们如何将查询执行分布到多台服务器上呢?在分片架构中,这一点相对显而易见,因为每个分片实际上都可以在其数据子集上执行查询。那么,在没有分片的情况下这是如何实现的呢?
并行副本简介
为了在多台服务器上实现查询的并行执行,我们首先需要能够将其中一台服务器指定为协调节点。协调节点负责创建需要执行的任务列表,确保这些任务全部执行、聚合,并将结果返回给客户端。与大多数分布式系统类似,这个角色由接收初始查询的节点承担。我们还需要定义工作单元。在分片架构中,工作单元是分片,即数据的一个子集。使用并行副本时,我们将使用表中的一小部分数据,称为粒度,作为工作单元。
现在,让我们借助下图来看看它在实践中是如何工作的:

使用并行副本时:
来自客户端的查询在经过负载均衡器后被发送到某个节点。该节点成为此查询的协调节点。
该节点分析每个 part 的索引,并选择需要处理的合适 part 和粒度。
协调节点将工作负载拆分成一组可以分配给不同副本的粒度。
每组粒度由相应的副本进行处理,完成后将可合并的中间状态发送给协调节点。
最后,协调节点合并来自各副本的所有结果,然后将响应返回给客户端。
上述步骤描述了并行副本在理论上的工作方式。 然而在实践中,存在许多因素会阻碍这套逻辑完美运行:
某些副本可能不可用。
ClickHouse 中的复制是异步的,在某些时间点上,一些副本可能不具有相同的 part。
副本之间的尾延迟需要以某种方式进行处理。
文件系统缓存会根据各副本上的活动在副本之间有所差异,这意味着随机分配任务在缓存局部性方面可能导致性能不够理想。
在接下来的小节中,我们将讨论如何克服这些因素。
公告
为了解决上面列表中的 (1) 和 (2),我们引入了“公告”(announcement)的概念。其工作方式如下图所示:

来自客户端的查询在经过负载均衡器后被发送到某个节点,该节点成为此查询的协调节点。
协调节点发送请求,从集群中所有副本获取公告。副本对于某个表当前分区(parts)集合的视图可能略有不同。因此,我们需要收集这些信息以避免做出错误的调度决策。
随后,协调节点使用这些公告来确定一组可分配给不同副本的粒度单元。例如,在这里我们可以看到,没有来自 part 3 的粒度被分配给副本 2,因为该副本在其公告中未声明该分区。还要注意,没有任务被分配给副本 3,因为该副本没有提供公告。
在每个副本都处理完其粒度子集上的查询,并将可合并状态发送回协调节点之后,协调节点会合并结果,并将响应返回给客户端。
动态协调
为了解决尾延迟问题,我们引入了动态协调机制。这意味着,所有 granule 不再在一次请求中全部发送到同一个副本,而是由每个副本向协调节点请求新的任务(要处理的一组 granule)。协调节点会根据收到的公告,为副本分配相应的一组 granule。
假设当前处于这样一个阶段:所有副本都已经发送了包含所有 part 的公告。
下图展示了动态协调的工作方式:

各副本向协调节点表明它们可以处理任务,同时还可以指定各自能够处理的工作量。
协调节点将任务分配给这些副本。

副本 1 和 2 能够非常快速地完成各自的任务,它们会向协调节点请求新的任务。
协调节点为副本 1 和 2 分配新的任务。

所有副本现在都已完成各自任务的处理,它们会请求更多任务。
协调节点利用公告检查还有哪些任务尚未被处理,但已经没有剩余任务了。
协调节点会告知副本所有内容都已处理完成。随后它会合并所有可合并的状态,并返回查询结果。
管理缓存局部性
最后一个潜在的问题是如何处理缓存局部性。如果多次执行同一个查询,如何确保相同的任务被路由到同一副本?在前面的示例中,我们有如下任务分配:
| 副本 1 | 副本 2 | 副本 3 | |
|---|---|---|---|
| Part 1 | g1, g6, g7 | g2, g4, g5 | g3 |
| Part 2 | g1 | g2, g4, g5 | g3 |
| Part 3 | g1, g6 | g2, g4, g5 | g3 |
为了确保相同的任务被分配到同一副本并能从缓存中获益,会进行两步操作:首先计算分片 + 一组 granule(即一个任务)的哈希值;然后对副本数量取模以进行任务分配。
理论上这听起来没问题,但在实际中,如果某个副本突然负载升高或网络质量下降,而该副本又持续用于执行某些任务,就会引入尾延迟。如果 max_parallel_replicas 小于副本数量,那么会随机选择副本来执行查询。
任务窃取
如果某个副本处理任务的速度比其他副本慢,其他副本会尝试从该副本那里“窃取”按哈希分配本应由它处理的任务,以降低尾部延迟。
限制
此功能存在一些已知限制,主要限制记录在本节中。
如果您发现的问题不在下面列出的限制之中,并且怀疑是由并行副本(parallel replica)导致的,请在 GitHub 上使用 comp-parallel-replicas 标签提交 issue。
| 限制 | 说明 |
|---|---|
| 复杂查询 | 目前,并行副本在处理简单查询时表现良好。增加复杂度的层级(例如 CTE、子查询、JOIN、非扁平查询等)可能会对查询性能产生负面影响。 |
| 小型查询 | 如果您执行的查询处理的行数不多,将其在多个副本上执行可能并不能带来更好的性能,因为副本之间协调所需的网络时间会在查询执行中引入额外开销。您可以通过使用以下设置来减少这些问题:parallel_replicas_min_number_of_rows_per_replica。 |
| 使用 FINAL 时禁用并行副本 | |
| 投影与并行副本不会同时使用 | |
| 高基数数据与复杂聚合 | 需要发送大量数据的高基数聚合会显著拖慢查询。 |
| 与新分析器的兼容性 | 在特定场景下,新分析器可能会显著减慢或加速查询执行。 |
与并行副本相关的设置
| Setting | Description |
|---|---|
enable_parallel_replicas | 0: 禁用1: 启用 2: 强制启用并行副本,如果无法使用并行副本则抛出异常。 |
cluster_for_parallel_replicas | 用于并行副本的集群名称;如果你使用的是 ClickHouse Cloud,请使用 default。 |
max_parallel_replicas | 在多个副本上执行查询时可使用的最大副本数;如果指定的值小于集群中的副本数,则会随机选择节点。此值也可以设置得高于实际副本数,以预留水平扩展空间。 |
parallel_replicas_min_number_of_rows_per_replica | 用于根据需要处理的行数来限制所使用的副本数量,实际使用的副本数定义为:estimated rows to read / min_number_of_rows_per_replica。 |
allow_experimental_analyzer | 0: 使用旧分析器1: 使用新分析器。 并行副本的行为可能会因所使用的分析器不同而发生变化。 |
排查并行副本相关问题
你可以在 system.query_log 表中检查每个查询实际使用的设置。你也可以查看 system.events 表,了解服务器上发生的所有事件,并使用 clusterAllReplicas 表函数查看所有副本上的表(如果你是云用户,请将集群名设为 default)。
响应
system.text_log 表中还包含有关使用并行副本执行查询的信息:
响应
最后,你还可以使用 EXPLAIN PIPELINE。它会直观展示 ClickHouse 将如何执行查询,以及在执行该查询时将使用哪些资源。比如下面这个查询:
我们先来看一下在没有并行副本时的查询流水线:

现在开启 parallel replica:
