Skip to main content

Creating an Incremental Materialization

In the previous example, we created a table to materialize the model. This table will be reconstructed for each dbt execution. This may be infeasible and extremely costly for larger result sets or complex transformations. To address this challenge and reduce the build time, dbt offers Incremental materializations. This allows dbt to insert or update records into a table since the last execution, making it appropriate for event-style data. While this is supported by the ClickHouse plugin, it does come with some Limitations users should be aware of.

To illustrate this example, we will add the actor "Clicky McClickHouse", who will appear in an incredible 910 movies - ensuring he has appeared in more films than even Mel Blanc.

  1. First, we modify our model to be of type incremental. This addition requires:

    1. unique_key - To ensure the plugin can uniquely identify rows, we must provide a unique_key - in this case, the id field from our query will suffice. This ensures we will have no row duplicates in our materialized table. For more details on uniqueness constraints, see here.
    2. Incremental filter - We also need to tell dbt how it should identify which rows have changed on an incremental run. This is achieved by providing a delta expression. Typically this involves a timestamp for event data; hence our updated_at timestamp field. This column, which defaults to the value of now() when rows are inserted, allows new roles to be identified. Additionally, we need to identify the alternative case where new actors are added. Using the {{this}} variable, to denote the existing materialized table, this gives us the expression where id > (select max(id) from {{ this }}) and updated_at > (select max(created_at) from {{this}}). We embed this inside the {% if is_incremental() %} condition, ensuring it is only used on incremental runs and not when the table is first constructed. For more details on filtering rows for incremental models, see this discussion in the dbt docs.

    Update the file actor_summary.sql with the following:

    {{ config(order_by='(updated_at, id, name)', engine='MergeTree()', materialized='incremental',
    unique_key='id') }}

    Note that our model will only respond to updates and additions to the roles and actors tables. To respond to all tables, users would be encouraged to split this model into multiple sub models - each with their own incremental criteria. These models can in turn be referenced and connected. For further details on cross referencing models see here.

  2. Execute a dbt run and confirm the results of the resulting table:

    [email protected]:~/imdb$  dbt run
    15:33:34 Running with dbt=1.0.4
    15:33:34 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 6 sources, 0 exposures, 0 metrics
    15:33:34
    15:33:35 Concurrency: 1 threads (target='dev')
    15:33:35
    15:33:35 1 of 1 START incremental model imdb_dbt.actor_summary........................... [RUN]
    15:33:41 1 of 1 OK created incremental model imdb_dbt.actor_summary...................... [OK in 6.33s]
    15:33:41
    15:33:41 Finished running 1 incremental model in 7.30s.
    15:33:41
    15:33:41 Completed successfully
    15:33:41
    15:33:41 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
    SELECT * FROM imdb_dbt.actor_summary ORDER BY num_movies DESC LIMIT 5;
    +------+------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+------------+----------+------------------+------+---------+-------------------+
    |45332 |Mel Blanc |832 |6.175853582979779 |18 |84 |2022-04-26 15:26:55|
    |621468|Bess Flowers|659 |5.57727638854796 |19 |293 |2022-04-26 15:26:57|
    |372839|Lee Phelps |527 |5.032976449684617 |18 |261 |2022-04-26 15:26:56|
    |283127|Tom London |525 |2.8721716524875673|17 |203 |2022-04-26 15:26:56|
    |356804|Bud Osborne |515 |2.0389507108727773|15 |149 |2022-04-26 15:26:56|
    +------+------------+----------+------------------+------+---------+-------------------+
  3. We will now add data to our model to illustrate an incremental update. Add our actor "Clicky McClickHouse" to the actors table:

    INSERT INTO imdb.actors VALUES (845466, 'Clicky', 'McClickHouse', 'M');
  4. Let's star Clicky in 910 random movies:

    INSERT INTO imdb.roles
    SELECT now() as created_at, 845466 as actor_id, id as movie_id, 'Himself' as role
    FROM imdb.movies
    LIMIT 910 OFFSET 10000;
  5. Confirm he is indeed now the actor with the most appearances by querying the underlying source table and bypassing any dbt models:

    SELECT id,
    any(actor_name) as name,
    uniqExact(movie_id) as num_movies,
    avg(rank) as avg_rank,
    uniqExact(genre) as unique_genres,
    uniqExact(director_name) as uniq_directors,
    max(created_at) as updated_at
    FROM (
    SELECT imdb.actors.id as id,
    concat(imdb.actors.first_name, ' ', imdb.actors.last_name) as actor_name,
    imdb.movies.id as movie_id,
    imdb.movies.rank as rank,
    genre,
    concat(imdb.directors.first_name, ' ', imdb.directors.last_name) as director_name,
    created_at
    FROM imdb.actors
    JOIN imdb.roles ON imdb.roles.actor_id = imdb.actors.id
    LEFT OUTER JOIN imdb.movies ON imdb.movies.id = imdb.roles.movie_id
    LEFT OUTER JOIN imdb.genres ON imdb.genres.movie_id = imdb.movies.id
    LEFT OUTER JOIN imdb.movie_directors ON imdb.movie_directors.movie_id = imdb.movies.id
    LEFT OUTER JOIN imdb.directors ON imdb.directors.id = imdb.movie_directors.director_id
    )
    GROUP BY id
    ORDER BY num_movies DESC
    LIMIT 2;
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |845466|Clicky McClickHouse|910 |1.4687938697032283|21 |662 |2022-04-26 16:20:36|
    |45332 |Mel Blanc |909 |5.7884792542982515|19 |148 |2022-04-26 16:17:42|
    +------+-------------------+----------+------------------+------+---------+-------------------+
  6. Execute a dbt run and confirm our model has been updated and matches the above results:

    [email protected]:~/imdb$  dbt run
    16:12:16 Running with dbt=1.0.4
    16:12:16 Found 1 model, 0 tests, 1 snapshot, 0 analyses, 181 macros, 0 operations, 0 seed files, 6 sources, 0 exposures, 0 metrics
    16:12:16
    16:12:17 Concurrency: 1 threads (target='dev')
    16:12:17
    16:12:17 1 of 1 START incremental model imdb_dbt.actor_summary........................... [RUN]
    16:12:24 1 of 1 OK created incremental model imdb_dbt.actor_summary...................... [OK in 6.82s]
    16:12:24
    16:12:24 Finished running 1 incremental model in 7.79s.
    16:12:24
    16:12:24 Completed successfully
    16:12:24
    16:12:24 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
    SELECT * FROM imdb_dbt.actor_summary ORDER BY num_movies DESC LIMIT 2;
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |id |name |num_movies|avg_rank |genres|directors|updated_at |
    +------+-------------------+----------+------------------+------+---------+-------------------+
    |845466|Clicky McClickHouse|910 |1.4687938697032283|21 |662 |2022-04-26 16:20:36|
    |45332 |Mel Blanc |909 |5.7884792542982515|19 |148 |2022-04-26 16:17:42|
    +------+-------------------+----------+------------------+------+---------+-------------------+

Internals

We can identify the statements executed to achieve the above incremental update by querying ClickHouse’s query log.

SELECT event_time, query  FROM system.query_log WHERE type='QueryStart' AND query LIKE '%dbt%'
AND event_time > subtractMinutes(now(), 15) ORDER BY event_time LIMIT 100;

Adjust the above query to the period of execution. We leave result inspection to the user but highlight the general strategy used by the plugin to perform incremental updates:

  1. The plugin creates a temporary table actor_sumary__dbt_tmp using the Memory engine. Rows that have changed are streamed into this table.

    create temporary table actor_summary__dbt_tmp
    engine = Memory
    order by ((updated_at, id, name))
    as (with actor_summary as (SELECT id,
    any(actor_name) as name,
    uniqExact(movie_id) as num_movies,
    avg(rank) as avg_rank,
    uniqExact(genre) as genres,
    uniqExact(director_name) as directors,
    max(created_at) as updated_at
    FROM (
    SELECT imdb.actors.id as id,
    concat(imdb.actors.first_name, ' ', imdb.actors.last_name) as actor_name,
    imdb.movies.id as movie_id,
    imdb.movies.rank as rank,
    genre,
    concat(imdb.directors.first_name, ' ', imdb.directors.last_name) as director_name,
    created_at
    FROM imdb.actors
    JOIN imdb.roles ON imdb.roles.actor_id = imdb.actors.id
    LEFT OUTER JOIN imdb.movies ON imdb.movies.id = imdb.roles.movie_id
    LEFT OUTER JOIN imdb.genres ON imdb.genres.movie_id = imdb.movies.id
    LEFT OUTER JOIN imdb.movie_directors ON imdb.movie_directors.movie_id = imdb.movies.id
    LEFT OUTER JOIN imdb.directors ON imdb.directors.id = imdb.movie_directors.director_id
    )
    GROUP BY id)

    select *
    from actor_summary

    where id > (select max(id) from imdb_dbt.actor_summary)
    or updated_at > (select max(updated_at) from imdb_dbt.actor_summary));
  2. The previous materialized table is renamed actor_summary_old. A new table actor_summary is created. The rows from the old table are, in turn, streamed from the old to new, with a check to make sure row ids do not exist in the temporary table. This effectively handles updates:

    insert into imdb_dbt.actor_summary ("id", "name", "num_movies", "avg_rank", "genres", "directors", "updated_at")
    select "id", "name", "num_movies", "avg_rank", "genres", "directors", "updated_at"
    from imdb_dbt.actor_summary__dbt_old
    where (id) not in (select (id)
    from actor_summary__dbt_tmp);
  3. Finally, results from the temporary table are streamed into the new actor_summary table:

    insert into imdb_dbt.actor_summary ("id", "name", "num_movies", "avg_rank", "genres", "directors", "updated_at")
    select "id", "name", "num_movies", "avg_rank", "genres", "directors", "updated_at"
    from actor_summary__dbt_tmp;

This strategy may encounter challenges on very large models. For further details see Limitations.