Blog / Engineering

Chaining Materialized Views in ClickHouse

author avatar
Mark Needham
Apr 16, 2024

Materialized views in ClickHouse are queries fired whenever a batch of rows arrives in a source table. They will operate on those rows, possibly transforming the data before writing to a destination table. The diagram below gives a high-level view of how this works:

mv-chain-1.png

Over the last couple of weeks, I’ve been learning about aggregation states. I created a small demo with two materialized views reading from the same Kafka table engine. One stored raw event data and the other stored aggregation states.

When I showed the example to Tom, he suggested that rather than have both materialized views read from the Kafka engine table, I could instead chain the materialized views together. The diagram below shows what he had in mind:

mv-chain-2.png

In other words, rather than having the aggregation state materialized view read from the Kafka engine table, I should rather have it read from the raw events that have already been extracted from Kafka.

In the rest of this blog post, we will go through a practical example of how to chain materialized views. We’ll use the Wiki recent changes feed, which provides a stream of events that represent changes made to various Wikimedia properties. The data is available as Server Side Events and the data property of an example message is shown below:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
    "request_id": "ccbbbe2c-6e1b-4bb7-99cb-317b64cbd5dc",
    "id": "41c73232-5922-4484-82f3-34d45f22ee7a",
    "dt": "2024-03-26T09:13:09Z",
    "domain": "en.wiktionary.org",
    "stream": "mediawiki.recentchange",
    "topic": "eqiad.mediawiki.recentchange",
    "partition": 0,
    "offset": 4974797626
  },
  "id": 117636935,
  "type": "edit",
  "namespace": 0,
  "title": "MP3播放器",
  "title_url": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
  "comment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per [[WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun]], [[WT:RFDO#Template:zh-noun]]; fix some lang codes (manually assisted)",
  "timestamp": 1711444389,
  "user": "WingerBot",
  "bot": true,
  "notify_url": "https://en.wiktionary.org/w/index.php?diff=78597416&oldid=50133194&rcid=117636935",
  "minor": true,
  "patrolled": true,
  "length": {
    "old": 229,
    "new": 234
  },
  "revision": {
    "old": 50133194,
    "new": 78597416
  },
  "server_url": "https://en.wiktionary.org",
  "server_name": "en.wiktionary.org",
  "server_script_path": "/w",
  "wiki": "enwiktionary",
  "parsedcomment": "clean up some labels; add missing space after *; {{zh-noun}} -&gt; {{head|zh|noun}}, {{zh-hanzi}} -&gt; {{head|zh|hanzi}} per <a href=\"/wiki/Wiktionary:RFDO#All_templates_in_Category:Chinese_headword-line_templates_except_Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun</a>, <a href=\"/wiki/Wiktionary:RFDO#Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#Template:zh-noun</a>; fix some lang codes (manually assisted)"
}

Let’s imagine that we’re building a dashboard to track the changes being made. We aren’t interested in the individual changes but rather want to track, on a minute-by-minute basis, the unique number of users making changes, the unique number of pages being changed, and the total changes made.

We’ll start by creating and then using the wiki database:

CREATE DATABASE wiki;
USE wiki;

Create Kafka Table Engine

Next, let’s create a table called wikiQueue that will consume messages from Kafka. The broker is running locally on port 9092, and our topic is called wiki_events.

Note that if you're using ClickHouse Cloud, you'll instead need to use ClickPipes to handle ingestion of data from Kafka.

CREATE TABLE wikiQueue(
    id UInt32,
    type String,
    title String,
    title_url String,
    comment String,
    timestamp UInt64,
    user String,
    bot Boolean,
    server_url String,
    server_name String,
    wiki String,
    meta Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
  'localhost:9092', 
  'wiki_events', 
  'consumer-group-wiki', 
  'JSONEachRow'
);

The rawEvents table stores the dateTime, title_url, topic, and user.

CREATE TABLE rawEvents (
    dateTime DateTime64(3, 'UTC'),
    title_url String,
    topic String,
    user String
) 
ENGINE = MergeTree 
ORDER BY dateTime;

We’ll then write the following materialized view to write data to rawEvents:

CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents AS 
SELECT toDateTime(timestamp) AS dateTime,
       title_url, 
       tupleElement(meta, 'topic') AS topic, 
       user
FROM wikiQueue
WHERE title_url <> '';

We’re using the toDateTime function to convert from an epoch seconds timestamp to a DateTime object. We also use the tupleElement function to extract the topic property from the meta object.

Storing aggregate states

Next, let’s create a table that stores aggregate states to enable incremental aggregation. Aggregate states are stored in a column with the AggregateFunction(<aggregationType>, <dataType>) type.

To keep a unique count of String values, which we need to do to track unique users and unique pages, we would use the AggregateFunction(uniq, String) type. To keep a running total, which we need for total updates, we would use the AggregateFunction(sum, UInt32) type. The UInt32 type gives us a maximum value of 4294967295, which is way more than the number of updates we’ll receive in one minute.

We’ll call this table byMinute and its definition is below:

CREATE TABLE byMinute
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

The materialized view that populates this table will read from rawEvents and use -State combinators to extract the intermediate state. We’ll use the uniqState function for users and pages and sumState for updates.

CREATE MATERIALIZED VIEW byMinute_mv TO byMinute AS 
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqState(user) as users,
       uniqState(title_url) as pages,
       sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;

The diagram below shows the chain of materialized views and tables that we’ve created so far:

mv-chain-3.png

We don’t have any data flowing into Kafka, so this table won’t have any data. Let’s fix that by running the following commands.

curl -N https://stream.wikimedia.org/v2/stream/recentchange  |
awk '/^data: /{gsub(/^data: /, ""); print}' |
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t wiki_events -Kø

This command extracts the data property from the recent changes feed, constructs a key:value pair using jq, and then pipes it into Kafka using kcat.

If we leave that running for a little while, we can then write a query to see how many changes are being made: ​​

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
    ┌────────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1.2024-03-26 15:53:00.00024875510022.2024-03-26 15:52:00.000429148121643.2024-03-26 15:51:00.000406141721594.2024-03-26 15:50:00.000392124018435.2024-03-26 15:49:00.000418134619106.2024-03-26 15:48:00.000422138818677.2024-03-26 15:47:00.000423144920158.2024-03-26 15:46:00.000409142019339.2024-03-26 15:45:00.0004021348182410.2024-03-26 15:44:00.00043216422142 │
    └─────────────────────────┴───────┴───────┴─────────┘

That all looks like it’s working well.

Adding another MV to the chain

Now, after running this for a while, we decide that it would be useful to group and chunk the data in 10-minute buckets rather than just 1-minute ones. We can do this by writing the following query against the byMinute table:

SELECT
    toStartOfTenMinutes(dateTime) AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

This will return something like the following, where the values in the dateTime column are now in increments of 10 minutes.

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘

This works fine with the small data volumes we’re working with, but when we’re working with bigger data, we might want to have another table that stores the data bucketed by 10-minute intervals. Let’s create that table:

CREATE TABLE byTenMinutes
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

Next, let’s create a materialized view to populate that table. The materialized view will query the byMinute table using a query similar to the one we used to compute the 10-minute buckets above. The only change is that instead of using -Merge combinators, we’ll need to use -MergeState combinators to return the aggregation state from aggregating the byMinute data rather than the underlying result.

In theory, we will save some calculation time, as the byMinute MV already aggregated data in one-minute buckets. Now, instead of aggregating the raw by-second data from scratch into 10-minute buckets, we exploit the one-minute buckets instead.

The materialized view is shown below:

CREATE MATERIALIZED VIEW byTenMinutes_mv TO byTenMinutes AS
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqMergeState(users) as users,
       uniqMergeState(pages) as pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

The following diagram shows the chaining of materialized views that we’ve now created:

mv-chain-4.png

If we query the byTenMinutes table it won’t have any data and once it does start populating, it will only pick up new data ingested into the byMinute table. But all is not lost, we can still write a query to backfill the old data:

INSERT INTO byTenMinutes 
SELECT toStartOfTenMinutes(dateTime),
       uniqMergeState(users) AS users, uniqMergeState(pages) AS pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

We can then write the following query against byTenMinutes to return the data grouped by 10-minute buckets:

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byTenMinutes
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

We’ll get back the same results as we did when querying the byMinute table:

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘
Share this post

Subscribe to our newsletter

Stay informed on feature releases, product roadmap, support, and cloud offerings!
Loading form...
Follow us
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image
© 2024 ClickHouse, Inc. HQ in the Bay Area, CA and Amsterdam, NL.