跳到主要内容
跳到主要内容

使用物化视图构建快速时间序列分析的汇总

本教程向您展示如何维护来自高流量事件表的预聚合汇总,使用 物化视图。您将创建三个对象:一个原始表、一个汇总表,以及自动写入汇总的物化视图。

何时使用此模式

在以下情况下使用此模式:

  • 您有一个 只追加事件流(点击、页面浏览、物联网、日志)。
  • 大多数查询是针对时间范围的 聚合(每分钟/小时/天)。
  • 您希望在不重新扫描所有原始行的情况下实现 一致的亚秒读取

创建原始事件表

CREATE TABLE events_raw
(
    event_time   DateTime,
    user_id      UInt64,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    value        Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
TTL event_time + INTERVAL 90 DAY DELETE

注意

  • PARTITION BY toYYYYMM(event_time) 保持分区小且易于删除。
  • ORDER BY (event_time, user_id) 支持时间边界查询 + 二级过滤。
  • LowCardinality(String) 为分类维度节省内存。
  • TTL 在 90 天后清理原始数据(根据您的保留要求进行调整)。

设计汇总(聚合)表

我们将预聚合到 每小时 的粒度。 选择您的粒度以匹配最常见的分析窗口。

CREATE TABLE events_rollup_1h
(
    bucket_start  DateTime,            -- start of the hour
    country       LowCardinality(String),
    event_type    LowCardinality(String),
    users_uniq    AggregateFunction(uniqExact, UInt64),
    value_sum     AggregateFunction(sum, Float64),
    value_avg     AggregateFunction(avg, Float64),
    events_count  AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type)

我们存储 聚合状态(例如,AggregateFunction(sum, ...)),它紧凑地表示部分聚合,并可以在稍后合并或最终确定。

创建填充汇总的物化视图

这个物化视图在插入 events_raw 时自动触发,并将 聚合状态 写入汇总。

CREATE MATERIALIZED VIEW mv_events_rollup_1h
TO events_rollup_1h
AS
SELECT
    toStartOfHour(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id)   AS users_uniq,
    sumState(value)           AS value_sum,
    avgState(value)           AS value_avg,
    countState()              AS events_count
FROM events_raw
GROUP BY bucket_start, country, event_type;

插入一些示例数据

插入一些示例数据:

INSERT INTO events_raw VALUES
    (now() - INTERVAL 4 SECOND, 101, 'US', 'view', 1),
    (now() - INTERVAL 3 SECOND, 101, 'US', 'click', 1),
    (now() - INTERVAL 2 SECOND, 202, 'DE', 'view', 1),
    (now() - INTERVAL 1 SECOND, 101, 'US', 'view', 1);

查询汇总

您可以在读取时 合并 状态,或 最终确定 它们:

SELECT
    bucket_start,
    country,
    event_type,
    uniqExactMerge(users_uniq) AS users,
    sumMerge(value_sum)        AS value_sum,
    avgMerge(value_avg)        AS value_avg,
    countMerge(events_count)   AS events
FROM events_rollup_1h
WHERE bucket_start >= now() - INTERVAL 1 DAY
GROUP BY ALL
ORDER BY bucket_start, country, event_type;

提示

如果您希望读取始终命中汇总,您可以创建第二个 物化视图,将 最终确定 的数字写入同一小时粒度的“普通” MergeTree 表。 状态提供更大的灵活性,而最终确定的数字提供稍微简化的读取。

针对主键中的字段过滤以获得最佳性能

您可以使用 EXPLAIN 命令查看索引如何用于修剪数据:

EXPLAIN indexes=1
SELECT *
FROM events_rollup_1h
WHERE bucket_start BETWEEN now() - INTERVAL 3 DAY AND now()
  AND country = 'US';
    ┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
1.  │ Expression ((Project names + Projection))                                                                                          │
2.  │   Expression                                                                                                                       │
3.  │     ReadFromMergeTree (default.events_rollup_1h)                                                                                   │
4.  │     Indexes:                                                                                                                       │
5.  │       MinMax                                                                                                                       │
6.  │         Keys:                                                                                                                      │
7.  │           bucket_start                                                                                                             │
8.  │         Condition: and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))                                 │
9.  │         Parts: 1/1                                                                                                                 │
10. │         Granules: 1/1                                                                                                              │
11. │       Partition                                                                                                                    │
12. │         Keys:                                                                                                                      │
13. │           toYYYYMM(bucket_start)                                                                                                   │
14. │         Condition: and((toYYYYMM(bucket_start) in (-Inf, 202509]), (toYYYYMM(bucket_start) in [202509, +Inf)))                     │
15. │         Parts: 1/1                                                                                                                 │
16. │         Granules: 1/1                                                                                                              │
17. │       PrimaryKey                                                                                                                   │
18. │         Keys:                                                                                                                      │
19. │           bucket_start                                                                                                             │
20. │           country                                                                                                                  │
21. │         Condition: and((country in ['US', 'US']), and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))) │
22. │         Parts: 1/1                                                                                                                 │
23. │         Granules: 1/1                                                                                                              │
    └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

上面的查询执行计划显示正在使用三种类型的索引: 一个 MinMax 索引、一个分区索引和一个主键索引。 每个索引使用我们主键中指定的字段:(bucket_start, country, event_type)。 为了获得最佳过滤性能,您需要确保查询利用主键字段来修剪数据。

常见变体

  • 不同的粒度:添加每日汇总:
CREATE TABLE events_rollup_1d
(
    bucket_start Date,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    users_uniq   AggregateFunction(uniqExact, UInt64),
    value_sum    AggregateFunction(sum, Float64),
    value_avg    AggregateFunction(avg, Float64),
    events_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type);

然后是第二个物化视图:

CREATE MATERIALIZED VIEW mv_events_rollup_1d
TO events_rollup_1d
AS
SELECT
    toDate(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id),
    sumState(value),
    avgState(value),
    countState()
FROM events_raw
GROUP BY ALL;
  • 压缩:对大列应用编解码器(例如:Codec(ZSTD(3)))在原始表上。
  • 成本控制:将重的保留推向原始表,保持长寿命汇总。
  • 回填:在加载历史数据时,插入到 events_raw 并让物化视图自动构建汇总。对于现有行,如果适合,请在创建物化视图时使用 POPULATEINSERT SELECT

清理和保留

  • 增加原始数据的 TTL(例如,30/90 天),但保持汇总的数据时间更长(例如,1 年)。
  • 如果启用了分层,您还可以使用 TTL 移动 旧片段到更便宜的存储。

故障排除

  • 物化视图未更新?检查插入是否进入 events_raw(而不是汇总表),以及物化视图目标是否正确(TO events_rollup_1h)。
  • 查询缓慢?确认它们命中汇总(直接查询汇总表),并且时间过滤与汇总粒度对齐。
  • 回填不匹配?使用 SYSTEM FLUSH LOGS 并检查 system.query_log / system.parts 确认插入和合并。