跳到主要内容
跳到主要内容

回填数据

无论是刚刚接触 ClickHouse 还是负责现有部署的用户,都不可避免地需要回填带有历史数据的表。在某些情况下,这相对简单,但在需要填充物化视图时可能会变得更加复杂。本指南记录了一些用户可以应用于自身用例的处理流程。

备注

本指南假设用户已熟悉 增量物化视图使用表函数进行数据加载,比如 s3 和 gcs 的概念。我们还建议用户阅读我们关于 优化对象存储插入性能 的指南,其中的建议可以适用于本指南中的插入操作。

示例数据集

在本指南中,我们使用 PyPI 数据集。此数据集中每一行代表使用 pip 等工具下载的 Python 包。

例如,子集涵盖了一天的内容 - 2024-12-17,并在 https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/ 上公开可用。用户可以通过以下查询:

此存储桶的完整数据集包含超过 320 GB 的 parquet 文件。在下面的示例中,我们故意使用 glob 模式来定位子集。

我们假设用户正在消费此日期之后的数据流,例如来自 Kafka 或对象存储。此数据的模式如下所示:

备注

完整的 PyPI 数据集包含超过 1 万亿行,用户可以在我们的公共演示环境 clickpy.clickhouse.com 中访问。有关此数据集的更多详细信息,包括演示如何利用物化视图来提高性能以及数据如何每天填充,请参见 这里

回填场景

通常,当从某一时间点消费数据流时,需要进行回填。此数据被插入到带有 增量物化视图 的 ClickHouse 表中,随着数据块的插入而触发。这些视图可能在插入之前对数据进行转换或计算汇总,并将结果发送到目标表,以便在下游应用中稍后使用。

我们将尝试涵盖以下场景:

  1. 使用现有数据摄取回填数据 - 新数据正在加载,且历史数据需要回填。此历史数据已被识别。
  2. 向现有表添加物化视图 - 需要向其历史数据已被填充且数据已在流入的设置中添加新的物化视图。

我们假设数据将从对象存储回填。在所有情况下,我们旨在避免数据插入的暂停。

我们建议从对象存储回填历史数据。数据应该尽可能导出为 Parquet 格式,以实现最佳读取性能和压缩(减少网络传输)。通常,约 150MB 的文件大小最为理想,但 ClickHouse 支持超过 70 种文件格式,并能够处理各种大小的文件。

使用重复表和视图

在所有场景下,我们依赖“重复表和视图”的概念。这些表和视图表示用于实时流数据的副本,允许在隔离的情况下执行回填,如果发生故障则有简单的恢复手段。例如,我们有以下主要的 pypi 表和物化视图,计算每个 Python 项目的下载次数:

我们用数据的子集填充主要表和关联视图:

假设我们希望加载另一个子集 {101..200}。虽然我们可以直接插入到 pypi 中,但可以通过创建重复表在隔离状态下完成回填。

如果回填失败,我们没有影响到主要表,只需 截断 重复表并重新执行。

要创建这些视图的新副本,我们可以使用 CREATE TABLE AS 子句,后加后缀 _v2

我们用大约相同大小的第二个子集填充这一表,并确认成功加载。

如果在第二次加载期间的任何时刻经历了失败,我们可以简单地 截断 pypi_v2pypi_downloads_v2,然后重复数据加载。

完成数据加载后,我们可以使用 ALTER TABLE MOVE PARTITION 子句将数据从重复表移动到主要表。

分区名称

上述的 MOVE PARTITION 调用使用了分区名称 ()。这表示此表的单一分区(未分区)。对于分区表,用户需要调用多个 MOVE PARTITION 调用 - 每个分区一个分区。当前分区的名称可以通过 system.parts 表确定,例如 SELECT DISTINCT partition FROM system.parts WHERE (table = 'pypi_v2')

我们现在可以确认 pypipypi_downloads 包含完整的数据。pypi_downloads_v2pypi_v2 可以安全删除。

重要的是,MOVE PARTITION 操作是轻量级的(利用硬链接)和原子性的,即它要么成功,要么失败,没有中间状态。

我们在下面的回填场景中大量利用这一过程。

请注意,该过程要求用户选择每次插入操作的大小。

较大的插入,即更多的行,将意味着需要更少的 MOVE PARTITION 操作。然而,这必须与因插入失败(例如由于网络中断)而恢复的成本相平衡。用户可以通过批量处理文件来降低风险。这可以通过范围查询,例如 WHERE timestamp BETWEEN 2024-12-17 09:00:00 AND 2024-12-17 10:00:00 或者 glob 模式来完成。例如,

备注

ClickPipes 在从对象存储加载数据时使用此方法,自动创建目标表及其物化视图的副本,避免用户执行上述步骤的需要。通过使用多个工作线程,每个线程处理不同子集(通过 glob 模式)及其自己的重复表,数据可以快速加载,并确保正确一次性语义。对此感兴趣的用户可以在 这篇博客 中找到更多详细信息。

场景 1: 使用现有数据摄取回填数据

在此场景中,我们假设要回填的数据不在一个孤立的存储桶中,因此需要过滤。数据已经在插入,并且可以识别出某个时间戳或单调递增的列,从而需要回填历史数据。

这个过程遵循以下步骤:

  1. 确定检查点 - 识别需要恢复的时间戳或列值。
  2. 创建主要表和物化视图目标表的重复副本。
  3. 创建指向在步骤 (2) 中创建的目标表的任何物化视图的副本。
  4. 插入到在步骤 (2) 中创建的重复主要表中。
  5. 将所有分区从重复表移动到它们的原始版本中。删除重复表。

例如,在我们的 PyPI 数据中,假设我们已经加载了数据。我们可以识别出最小时间戳,因此我们的“检查点”。

从上述内容,我们知道我们需要加载在 2024-12-17 09:00:00 之前的数据。使用我们之前的过程,我们创建重复表和视图,并使用时间戳上的过滤加载子集。

备注

在 Parquet 中对时间戳列进行过滤可以非常高效。ClickHouse 只会读取时间戳列以识别出要加载的完整数据范围,从而最大限度地减少网络流量。ClickHouse 查询引擎还可以利用 Parquet 索引,如最小-最大。

一旦此插入完成,我们可以移动相关的分区。

如果历史数据在一个孤立的存储桶中,则不需要上述时间过滤。如果没有时间或单调列,请隔离您的历史数据。

只需在 ClickHouse Cloud 中使用 ClickPipes

ClickHouse Cloud 用户应在数据可以孤立到自己的存储桶中(且不需要过滤)时使用 ClickPipes 来恢复历史备份。通过并行加载多个工作线程,从而减少加载时间,ClickPipes 自动化了上述过程 - 为主表和物化视图创建重复表。

场景 2: 向现有表添加物化视图

在需要向已填充大量数据以及正在插入数据的设置中添加新物化视图并不罕见。这里可使用时间戳或单调递增的列来识别流中的某个点,这有助于避免数据插入的暂停。在下面的示例中,我们假设这两种情况,倾向于选择避免暂停的插入方法。

避免使用 POPULATE

我们不建议使用 POPULATE 命令来回填物化视图,除非是暂停插入的小数据集。此操作符可能会错过插入到其源表的行,物化视图是在 populate hash 完成后创建的。此外,此 populate 将对所有数据运行,且在大数据集上容易受到中断或内存限制的影响。

可用时间戳或单调递增列

在这种情况下,我们建议新的物化视图包含一个过滤器,将行限制为未来任意数据的值。然后可以使用来自主表的历史数据从此日期回填物化视图。回填方法依赖于数据大小和相关查询的复杂性。

我们最简单的方法包括以下步骤:

  1. 创建我们的物化视图,并加上一个仅考虑任意近期时间之后的行的过滤器。
  2. 运行一个 INSERT INTO SELECT 查询,该查询向物化视图的目标表插入数据,读取源表与视图的聚合查询。

这可以进一步增强以在步骤 (2) 中针对数据子集,和/或使用物化视图的重复目标表(在插入完成后将分区附加到原始表)以便于故障后的恢复。

考虑以下物化视图,它计算每小时最受欢迎的项目。

虽然我们可以添加目标表,但在添加物化视图之前,我们修改其 SELECT 子句,以包含一个仅考虑任意未来时间后行的过滤器 - 在这种情况下我们假设 2024-12-17 09:00:00 是几分钟后的时间。

一旦添加了此视图,我们就可以在此数据之前回填物化视图的所有数据。

完成此操作最简单的方法是简单地在主表上运行物化视图的查询,并加上过滤以忽略最近添加的数据,通过 INSERT INTO SELECT 将结果插入到我们视图的目标表。例如,对于上面的视图:

备注

在上述示例中,我们的目标表是 SummingMergeTree。在这种情况下,我们可以直接使用原始聚合查询。对于更复杂的用例,这些用例利用 AggregatingMergeTree,用户将使用 -State 函数进行汇总。对此的示例可以在 这里 找到。

在我们的案例中,这是相对轻量级的聚合,完成时间少于 3 秒,使用内存少于 600MiB。对于更复杂或运行时间较长的聚合,用户可以通过使用早期的重复表方法来增强此过程,即创建一个影子目标表,例如 pypi_downloads_per_day_v2,将数据插入这个表,并将其生成的分区附加到 pypi_downloads_per_day

物化视图的查询往往可以更复杂(这并不少见,否则用户就不会使用视图了!)并消耗资源。在更少见的情况下,查询所需的资源超出了服务器的承载能力。这凸显了 ClickHouse 物化视图的一大优势 - 它们是增量的,不会一次性处理整个数据集!

在这种情况下,用户有几种选择:

  1. 修改查询以回填范围,例如 WHERE timestamp BETWEEN 2024-12-17 08:00:00 AND 2024-12-17 09:00:00WHERE timestamp BETWEEN 2024-12-17 07:00:00 AND 2024-12-17 08:00:00 等。
  2. 使用 Null 表引擎 来填充物化视图。这模拟了物化视图的典型增量填充,在数据块(可配置大小)上执行查询。

(1)代表了最简单的方法,通常就足够了。为了简洁起见,我们不提供示例。

我们下面进一步探讨(2)。

使用 Null 表引擎填充物化视图

Null 表引擎 提供了一种不持久化数据的存储引擎(想象它是表引擎世界的 /dev/null)。尽管这似乎矛盾,但物化视图仍会在插入到此表引擎的数据上执行。这允许在不持久化原始数据的情况下构建物化视图,避免了输入输出及相关存储。

重要的是,任何附着到表引擎的物化视图仍然会在其插入的数据块上执行 - 将结果发送到目标表。这些块的大小是可配置的。虽然较大的块可能在效率上更高(处理更快),但它们消耗的资源(主要是内存)也更多。使用此表引擎意味着我们可以逐步增量构建物化视图,即一次一个块,避免将整个聚合保持在内存中。


考虑以下示例:

在此,我们创建一个 Null 表 pypi_v2,以接收将用于构建物化视图的行。请注意,我们将模式限制为仅包括所需的列。我们的物化视图对插入到此表的数据(一次一个块)执行聚合,将结果发送到我们的目标表 pypi_downloads_per_day

备注

我们在此使用 pypi_downloads_per_day 作为目标表。为了增加弹性,用户可以创建重复表 pypi_downloads_per_day_v2,并将其用作视图的目标表,如前面示例中所示。在插入完成后,pypi_downloads_per_day_v2 中的分区可以反过来移动到 pypi_downloads_per_day。这将允许在由于内存问题或服务器中断而导致插入失败的情况下进行恢复,即我们只需截断 pypi_downloads_per_day_v2,调整设置,然后重试。

要填充这个物化视图,我们只需将相关数据从 pypi. 插入到 pypi_v2 中进行回填。

请注意,我们的内存使用为 639.47 MiB

调整性能和资源

几个因素将决定上述场景中的性能和资源使用情况。在尝试调整之前,我们建议读者了解在 优化 S3 插入和读取性能指南使用线程进行读取 部分详细记录的插入机制。简而言之:

  • 读取并行性 - 用于读取的线程数。通过 max_threads 控制。在 ClickHouse Cloud 中,这取决于实例大小,默认为 vCPUs 的数量。增加该值可能会提高读取性能,但会增加内存使用。
  • 插入并行性 - 用于插入的线程数。通过 max_insert_threads 控制。在 ClickHouse Cloud 中,这由实例大小(在 2 到 4 之间)确定,在 OSS 中设置为 1。增加该值可能会提高性能,但会增加内存使用。
  • 插入块大小 - 数据在一个循环中被处理,在此过程中数据被拉取、解析并形成内存中的插入块,基于 分区键。这些块会被排序、优化、压缩,并作为新的 数据部分 写入存储。插入块的大小由设置 min_insert_block_size_rowsmin_insert_block_size_bytes(未压缩)控制,影响内存使用和磁盘 I/O。较大的块使用更多内存,但创建更少的部分,从而减少 I/O 和后台合并。这些设置表示最小阈值(达到第一个的触发刷新)。
  • 物化视图块大小 - 除了上述主插入机制,在插入到物化视图之前,块也会被压缩以提高处理效率。这些块的大小由设置 min_insert_block_size_bytes_for_materialized_viewsmin_insert_block_size_rows_for_materialized_views 决定。较大的块会以较大的内存使用换取更高效的处理。默认情况下,这些设置会恢复到源表设置 min_insert_block_size_rowsmin_insert_block_size_bytes 的值。

为了提高性能,用户可以遵循在 优化 S3 插入和读取性能指南调整线程和插入块大小 部分中概述的准则。在大多数情况下,不需要同时修改 min_insert_block_size_bytes_for_materialized_viewsmin_insert_block_size_rows_for_materialized_views 来提高性能。如果必须修改,则使用与 min_insert_block_size_rowsmin_insert_block_size_bytes 讨论的相同最佳实践。

为了最小化内存使用,用户可能希望尝试调整这些设置。这将不可避免地降低性能。使用之前的查询,我们在下面展示示例。

max_insert_threads 降低到 1 可以减少我们的内存开销。

我们可以通过将 max_threads 设置降低到 1,进一步减少内存。

最后,我们可以通过将 min_insert_block_size_rows 设置为 0(禁用它作为块大小的决定因素)和将 min_insert_block_size_bytes 设置为 10485760(10MiB),进一步减少内存使用。

最后,请注意,降低块大小会产生更多部分并造成更大的合并压力。如 这里 讨论的,这些设置应谨慎修改。

没有时间戳或单调递增列

上述过程依赖于用户拥有时间戳或单调递增的列。在某些情况下,这些信息根本不可用。在这种情况下,我们建议采用以下过程,该过程利用了之前概述的许多步骤,但需要用户暂停插入。

  1. 暂停主表的插入。
  2. 使用 CREATE AS 语法创建主目标表的重复表。
  3. 使用 ALTER TABLE ATTACH 将原始目标表的分区附加到重复表中。注意: 此附加操作与之前使用的移动操作不同。虽然依赖于硬链接,但保留了原始表中的数据。
  4. 创建新的物化视图。
  5. 重新启动插入。注意: 插入只会更新目标表,而不会更新重复表,后者仅引用原始数据。
  6. 回填物化视图,使用上述针对时间戳的数据使用的相同流程,以重复表作为源。

考虑使用 PyPI 和我们之前的新物化视图 pypi_downloads_per_day 的以下示例(我们假设不能使用时间戳):

在倒数第二个步骤中,我们使用简单的 INSERT INTO SELECT 方法回填 pypi_downloads_per_day,如 之前 所述。这也可以使用上述文档中的 Null 表方法增强,并可选择使用重复表以增加弹性。

尽管此操作确实需要暂停插入,但中间操作通常可以快速完成 - 最小化任何数据中断。