并行副本
介绍
ClickHouse 可以极快地处理查询,但这些查询是如何在多个服务器之间分发和并行执行的呢?
在本指南中,我们将首先讨论 ClickHouse 如何通过分布式表将查询分发到多个分片,然后再讨论查询如何利用多个副本来执行。
分片架构
在无共享架构中,集群通常会被拆分成多个分片,每个分片包含总体数据的一个子集。一个分布式表位于这些分片之上,提供对完整数据的统一视图。
读取可以发送到本地表。查询的执行将仅在指定的分片上进行,或者可以发送到分布式表,在这种情况下,每个分片将执行给定的查询。查询分布式表的服务器将汇总数据并响应客户端:

上图可视化了客户端查询分布式表时发生的情况:
select 查询被发送到一个节点上的分布式表(可以通过轮询策略或在负载均衡器路由到特定服务器后进行)。这个节点现在将充当协调者。
节点将根据分布式表指定的信息定位需要执行查询的每个分片,并将查询发送到每个分片。
每个分片在本地读取、过滤和聚合数据,然后将可合并的状态发送回协调者。
协调节点合并数据,然后将响应发送回客户端。
当我们加入副本时,过程相似,唯一的区别是每个分片只有一个副本会执行查询。这意味着将能够并行处理更多查询。
非分片架构
ClickHouse Cloud 具有与上述架构非常不同的架构。(有关更多详细信息,请参见 "ClickHouse Cloud 架构")。由于计算和存储的分离,以及几乎无限的存储量,分片的重要性降低。
下图展示了 ClickHouse Cloud 的架构:

该架构使我们能够几乎瞬时地添加和移除副本,确保了非常高的集群可扩展性。ClickHouse Keeper 集群(右侧显示)确保我们对于元数据拥有单一的可信源。副本可以从 ClickHouse Keeper 集群获取元数据,并且所有副本维护相同的数据。数据本身存储在对象存储中,SSD 缓存允许我们加速查询。
但是,我们现在如何在多个服务器之间分发查询执行呢?在分片架构中,由于每个分片实际上可以在数据的子集上执行查询,这一点相当明显。没有分片时,它是如何工作的呢?
引入并行副本
为了通过多个服务器并行化查询执行,我们首先需要能够将我们的一个服务器指定为协调者。协调者负责创建需要执行的任务列表,确保所有任务都被执行、聚合并将结果返回给客户端。就像大多数分布式系统一样,这将是接收初始查询的节点的角色。我们还需要定义工作单元。在分片架构中,工作单元是分片,即数据的子集。在并行副本中,我们将使用称为 granules 的小部分表作为工作单元。
现在,让我们借助下图看看它在实践中的工作原理:

使用并行副本时:
客户端的查询在经过负载均衡器后发送到一个节点。这个节点成为此查询的协调者。
节点分析每个部分的索引,并选择合适的部分和 granules 进行处理。
协调者将工作负载分解为一组可以分配给不同副本的 granules。
每组 granules 由相应的副本处理,完成后将可合并状态发送到协调者。
最后,协调者合并来自副本的所有结果,然后将响应返回给客户端。
以上步骤概述了并行副本在理论上的工作方式。然而,在实践中,有许多因素可能会阻止这种逻辑完美运行:
一些副本可能不可用。
在 ClickHouse 中,复制是异步的,某些副本在某些时刻可能没有相同的部分。
需要以某种方式处理副本之间的尾延迟。
基于每个副本的活动,文件系统缓存在副本之间有所不同,这意味着随机任务分配可能由于缓存局部性导致性能不佳。
我们将在接下来的章节中探讨如何克服这些因素。
公告
为了解决上述列表中的 (1) 和 (2) 问题,我们引入了公告的概念。让我们使用下图可视化它的工作原理:

客户端的查询在经过负载均衡器后发送到一个节点。该节点成为此查询的协调者。
协调节点向集群中的所有副本发送请求以获取公告。副本可能对某个表的当前部分集有稍微不同的视图。因此,我们需要收集这些信息以避免不正确的调度决策。
协调节点然后使用公告来定义可以分配给不同副本的一组 granules。在这里,例如,我们可以看到没有来自部件 3 的 granules 被分配给副本 2,因为此副本在其公告中没有提供该部分。另外,请注意没有任务被分配给副本 3,因为该副本未提供公告。
每个副本在其 granules 子集上处理完查询后,将可合并的状态发送回协调者,协调者合并结果并将响应发送给客户端。
动态协调
为了解决尾延迟的问题,我们添加了动态协调。这意味着并非所有的 granules 都在一个请求中发送给一个副本,而每个副本能够向协调者请求新的任务(待处理的 granules 集)。协调者将根据接收到的公告向副本提供 granules 集。
假设我们处于所有副本都已发送公告的阶段。
下图可视化了动态协调的工作原理:

副本通知协调者节点它们能够处理任务,它们还可以指定可以处理的工作量。
协调者将任务分配给副本。

副本 1 和 2 能够非常快速地完成它们的任务。它们将向协调者请求另一个任务。
协调者向副本 1 和 2 分配新任务。

所有副本现在都已完成它们的任务处理。它们请求更多任务。
协调者使用公告检查待处理的任务,但没有剩余任务。
协调者告诉副本一切都已处理。它现在将合并所有可合并状态并响应查询。
管理缓存局部性
最后一个潜在问题是如何处理缓存局部性。如果查询被多次执行,我们如何确保相同的任务路由到相同的副本?在前面的示例中,我们有以下任务分配:
副本 1 | 副本 2 | 副本 3 | |
---|---|---|---|
部件 1 | g1, g6, g7 | g2, g4, g5 | g3 |
部件 2 | g1 | g2, g4, g5 | g3 |
部件 3 | g1, g6 | g2, g4, g5 | g3 |
为了确保相同的任务被分配给相同的副本并可以利用缓存,发生了两件事。计算部件 + granules 集合(任务)的哈希。应用任务分配的副本数量的模运算。
纸面上这听起来不错,但在现实中,某个副本的突然负载或网络劣化,如果始终使用同一副本执行某些任务,可能会导致尾延迟。如果 max_parallel_replicas
小于副本的数量,则随机选择副本进行查询执行。
任务窃取
如果某个副本处理任务的速度慢于其他副本,其他副本将尝试“窃取”原则上属于该副本的任务,以减少尾延迟。
限制
此功能有已知的限制,主要限制记录在本节中。
如果您发现一个不属于下面给出的限制的问题,并且怀疑并行副本是原因,请在 GitHub 上使用标签 comp-parallel-replicas
提交问题。
限制 | 描述 |
---|---|
复杂查询 | 当前并行副本对简单查询工作得相当不错。复杂层次如 CTEs、子查询、JOIN、非扁平查询等可能会对查询性能产生负面影响。 |
小查询 | 如果您正在执行一个不处理大量行的查询,在多个副本上执行可能不会提高性能,因为副本之间协调的网络时间可能导致查询执行中的额外周期。您可以通过使用设置 parallel_replicas_min_number_of_rows_per_replica 来限制这些问题。 |
使用 FINAL 时并行副本被禁用 | |
高基数数据和复杂聚合 | 高基数聚合需要发送大量数据,可能会显著减慢查询速度。 |
与新分析器的兼容性 | 在特定场景下,新分析器可能显著减慢或加速查询执行。 |
与并行副本相关的设置
设置 | 描述 |
---|---|
enable_parallel_replicas | 0 : 禁用1 : 启用 2 : 强制使用并行副本,如果未使用则抛出异常。 |
cluster_for_parallel_replicas | 用于并行复制的集群名称;如果您使用的是 ClickHouse Cloud,请使用 default 。 |
max_parallel_replicas | 用于查询在多个副本上执行的最大副本数,指定的数量若低于集群中的副本数,节点将随机选择。此值也可以过度承诺以适应水平扩展。 |
parallel_replicas_min_number_of_rows_per_replica | 帮助根据需要处理的行数限制使用的副本数,使用的副本数由:estimated rows to read / min_number_of_rows_per_replica 定义。 |
allow_experimental_analyzer | 0 : 使用旧分析器1 : 使用新分析器。并行副本的行为可能会根据使用的分析器而变化。 |
调查并行副本的问题
您可以检查 system.query_log
表中每个查询所使用的设置。您还可以查看 system.events
表,以查看服务器上发生的所有事件,并可以使用 clusterAllReplicas
表函数查看所有副本上的表(如果您是云用户,请使用 default
)。
响应
system.text_log
表还包含使用并行副本执行查询的信息:
响应
最后,您还可以使用 EXPLAIN PIPELINE
。它突出显示 ClickHouse 将如何执行查询以及将使用哪些资源来执行查询。让我们以以下查询为例:
让我们查看没有并行副本的查询管道:

现在使用并行副本:
