集成 dbt 和 ClickHouse
dbt(数据构建工具)使分析工程师能够通过简单编写 SELECT 语句在其数据仓库中转换数据。 dbt 处理将这些 SELECT 语句物化为数据库中的对象,形成表和视图 - 执行 提取、加载和转换(ELT) 中的转化部分。用户可以创建由 SELECT 语句定义的模型。
在 dbt 中,这些模型可以相互引用并分层,以便构建更高层次的概念。连接模型所需的模板 SQL 会自动生成。此外,dbt 识别模型之间的依赖关系,并确保它们按照适当的顺序创建,使用有向无环图(DAG)。
dbt 通过一个 ClickHouse 支持的插件 与 ClickHouse 兼容。我们描述连接 ClickHouse 的过程,并通过一个基于公开可用的 IMDB 数据集的简单示例进行说明。我们还强调当前连接器的一些局限性。
概念
dbt 引入了模型的概念。这被定义为一个 SQL 语句,可能连接多个表。一个模型可以以多种方式“物化”。物化表示模型 SELECT 查询的构建策略。物化背后的代码是模板 SQL,它将您的 SELECT 查询包装在语句中,以便创建一个新关系或更新现有关系。
dbt 提供 4 种类型的物化:
- 视图(默认):模型作为数据库中的视图构建。
- 表:模型作为数据库中的表构建。
- 临时:模型不直接在数据库中构建,而是作为公共表表达式被拉入依赖模型中。
- 增量:模型最初作为表物化,在后续运行中,dbt 会在表中插入新行并更新更改的行。
额外的语法和子句定义了如果其基础数据发生更改,如何更新这些模型。 dbt 通常建议从视图物化开始,直到性能成为一个问题。表物化通过将模型查询的结果捕获为一个表来提供查询时间性能的提升,但以增加存储为代价。增量方法在此基础上进一步发展,允许随后对基础数据的更新被捕获到目标表中。
当前的插件支持 ClickHouse 的 视图、表、临时 和 增量 物化。该插件还支持 dbt 快照 和 种子,我们将在本指南中进行探讨。
在以下指南中,我们假设您有一个可用的 ClickHouse 实例。
dbt 和 ClickHouse 插件的设置
dbt
我们假设在以下示例中使用 dbt CLI。用户也可以考虑 dbt Cloud,它提供基于 Web 的集成开发环境(IDE),允许用户编辑和运行项目。
dbt 提供了多种 CLI 安装选项。请按照 这里 中描述的说明进行操作。在此阶段仅安装 dbt-core。我们推荐使用 pip
。
重要:以下在 python 3.9 下经过测试。
ClickHouse 插件
安装 dbt ClickHouse 插件:
准备 ClickHouse
dbt 在建模高度关系型数据时表现优秀。为了示范,我们提供了一个小的 IMDB 数据集,其关系模式如下。该数据集源自 关系数据集存储库。与常用的 dbt 模式相比,这非常简单,但代表了一个可管理的样本:

我们使用这些表的子集,如下所示。
创建以下表:
表 roles
的列 created_at
默认为 now()
的值。我们稍后使用这个值来识别模型的增量更新 - 请参阅 增量模型。
我们使用 s3
函数从公共端点读取源数据以插入数据。运行以下命令以填充表:
这些执行可能会因您的带宽而有所不同,但每个操作仅需几秒钟即可完成。执行以下查询以计算每个演员的摘要,按出演电影数量排序,并确认数据成功加载:
响应应如下所示:
在后续的指南中,我们将把这个查询转换成一个模型 - 在 ClickHouse中作为 dbt 视图和表进行物化。
连接到 ClickHouse
- 创建一个 dbt 项目。在这种情况下,我们以
imdb
数据源命名。当提示时,选择clickhouse
作为数据库源。
cd
进入您的项目文件夹:
-
此时,您需要所选择的文本编辑器。在下面的示例中,我们使用流行的 VS Code。打开 IMDB 目录,您应该会看到一系列 yml 和 sql 文件:
-
更新您的
dbt_project.yml
文件,以指定我们的第一个模型 -actor_summary
并设置配置文件为clickhouse_imdb
。 -
接下来,我们需要向 dbt 提供 ClickHouse 实例的连接详细信息。将以下内容添加到您的
~/.dbt/profiles.yml
。
注意需要修改用户和密码。有额外的可用设置文档记录在 这里。
- 从 IMDB 目录中,执行
dbt debug
命令,以确认 dbt 是否能够连接到 ClickHouse。
确认响应包含 Connection test: [OK connection ok]
,表示连接成功。
创建简单的视图物化
在使用视图物化时,模型在每次运行时都会通过 ClickHouse 中的 CREATE VIEW AS
语句重建。这不需要额外的数据存储,但查询速度会比表物化慢。
- 从
imdb
文件夹中,删除目录models/example
:
- 在
models
文件夹中的actors
创建一个新文件。在这里,我们创建每个代表演员模型的文件:
- 在
models/actors
文件夹中创建文件schema.yml
和actor_summary.sql
。
文件 schema.yml
定义了我们的表。这些表随后将在宏中可用。编辑 models/actors/schema.yml
,使其包含以下内容:
文件 actors_summary.sql
定义了我们实际的模型。请注意,在配置函数中,我们还请求将模型物化为 ClickHouse 中的视图。我们的表通过函数 source
从 schema.yml
文件中引用,例如 source('imdb', 'movies')
引用 imdb
数据库中的 movies
表。编辑 models/actors/actors_summary.sql
,使其包含以下内容:
请注意,我们在最终的 actor_summary 中包含了列 updated_at
。我们稍后将其用于增量物化。
- 从
imdb
目录执行命令dbt run
。
- dbt 将按照请求将该模型表示为 ClickHouse 中的视图。现在我们可以直接查询此视图。该视图将在
imdb_dbt
数据库中创建 - 这由~/.dbt/profiles.yml
文件中clickhouse_imdb
配置文件下的 schema 参数确定。
查询此视图时,我们可以使用更简洁的语法复制之前查询的结果:
创建表物化
在前面的示例中,我们的模型被物化为一个视图。虽然这可能对某些查询提供足够的性能,但更复杂的 SELECT 或频繁执行的查询可能更适合物化为表。这种物化对通过 BI 工具查询的模型非常有用,以确保用户有更快的体验。这实际导致查询结果被存储为新表,并具有相关的存储开销 - 有效地执行了 INSERT TO SELECT
。请注意,表将在每次时重建,即它不是增量的。因此,大的结果集可能导致长时间的执行时间 - 请参阅 dbt 限制。
- 修改
actors_summary.sql
文件,使materialized
参数设置为table
。请注意ORDER BY
的定义,并且我们使用MergeTree
表引擎:
- 从
imdb
目录执行命令dbt run
。此执行可能需要稍长时间 - 在大多数机器上约 10 秒。
- 确认创建表
imdb_dbt.actor_summary
:
您应看到具有适当数据类型的表:
- 确认该表中的结果与之前的响应一致。请注意,现在模型作为表存在,响应时间显著改善:
随意对该模型发出其他查询。例如,哪些演员有超过 5 次出演的高评分电影?
创建增量物化
前面的示例创建了一个表来物化模型。此表将为每次 dbt 执行重建。这对于较大的结果集或复杂的转换可能不可行且极其昂贵。为了解决这个挑战并减少构建时间,dbt 提供增量物化。这允许 dbt 插入或更新自上次执行以来表中的记录,使其适合事件风格的数据。在后台,会创建一个临时表,其中包含所有更新记录,然后将所有未修改的记录和已更新的记录插入到新的目标表中。这对于大型结果集会导致与表模型相似的 限制。
为了克服大型数据集的这些限制,插件支持 “inserts_only” 模式,其中所有更新都直接插入到目标表中,而无需创建临时表(下面会详细讨论)。
为了说明这个例子,我们将添加演员 “Clicky McClickHouse”,他将在 910 部电影中出现 - 确保他比 Mel Blanc 出现的电影还要多。
-
首先,我们修改我们的模型为增量类型。这一增加要求:
- unique_key - 为了确保插件能够唯一识别行,我们必须提供一个 unique_key - 在这种情况下,查询中的
id
字段就足够了。这确保我们在物化表中没有行重复。有关唯一性约束的更多详细信息,请参阅 这里。 - 增量过滤器 - 我们还需要告诉 dbt 如何在增量运行中识别哪些行已更改。这是通过提供增量表达式实现的。通常这涉及到事件数据的时间戳;因此我们使用
updated_at
时间戳字段。此列在插入行时默认为now()
的值,允许识别新角色。此外,我们需要标识添加新演员的替代情况。使用{{this}}
变量来表示现有物化表,给出表达式where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}})
。我们将其嵌入{% if is_incremental() %}
条件中,以确保仅在增量运行时使用,而不在构建表时使用。有关增量模型过滤行的更多详细信息,请参见 dbt 文档中的讨论。
更新文件
actor_summary.sql
如下: - unique_key - 为了确保插件能够唯一识别行,我们必须提供一个 unique_key - 在这种情况下,查询中的
注意到我们的模型将仅响应对 roles
和 actors
表的更新和添加。为了响应所有表,用户应考虑将此模型拆分为多个子模型 - 每个模型具有自己的增量标准。这些模型可以相互引用和连接。有关交叉引用模型的更多详细信息,请参见 这里。
- 执行
dbt run
并确认结果表的结果:
- 现在我们将向模型中添加数据,以说明增量更新。将演员 "Clicky McClickHouse" 添加到
actors
表中:
- 让 "Clicky" 出现在 910 部随机电影中:
- 通过查询底层源表并跳过任何 dbt 模型,确认他确实是出镜次数最多的演员:
- 执行
dbt run
并确认我们的模型已更新,并与上述结果匹配:
内部机制
我们可以通过查询 ClickHouse 的查询日志来识别执行上述增量更新的语句。
调整上述查询以适应执行的时间段。我们将结果检查留给用户,但强调插件用来执行增量更新的一般策略:
- 插件创建一个临时表
actor_sumary__dbt_tmp
。发生更改的行会被流入此表。 - 创建一个新表
actor_summary_new
。旧表中的行会从旧表流向新表,同时检查行 ID 不存在于临时表中。这有效地处理了更新和重复。 - 从临时表中的结果将流入新的
actor_summary
表: - 最后,通过
EXCHANGE TABLES
语句以原子方式将新表与旧版本交换,旧表和临时表也会被丢弃。
这一过程如下所示:

该策略在非常大的模型上可能会遇到挑战。有关更多详细信息,请参阅 限制。
附加策略(仅插入模式)
为了克服增量模型中大数据集的限制,插件使用 dbt 配置参数 incremental_strategy
。此参数可以设置为 append
的值。设置后,更新的行将直接插入目标表(即 imdb_dbt.actor_summary
),并且不会创建临时表。
注意:仅附加模式要求您的数据为不可变或可以接受重复。如果您希望支持修改行的增量表模型,请不要使用此模式!
为了说明此模式,我们将添加另一个新演员,并使用 incremental_strategy='append'
重新执行 dbt run。
- 在 actor_summary.sql 中配置仅附加模式:
- 让我们添加另一个著名演员 - Danny DeVito。
- 让我们让 Danny 在 920 部随机电影中出镜。
- 执行 dbt run 并确认 Danny 被添加到了演员摘要表中。
请注意,与插入 "Clicky" 相比,这次增量处理的速度更快。
再次检查 query_log 表显示了两次增量运行之间的差异:
在此运行中,所有的新行都直接添加到 imdb_dbt.actor_summary
表中,且没有涉及任何表创建。
删除+插入模式(实验性)
历史上 ClickHouse 仅有限支持更新和删除,以异步 突变 的形式实现。这些操作可能极其消耗 IO,并且通常应该避免。
ClickHouse 22.8 引入了 轻量级删除。这些目前是实验性的,但提供了一种更高效的删除数据的手段。
此模式可以通过 incremental_strategy
参数为模型配置,即:
该策略直接在目标模型的表上操作,因此如果在操作期间出现问题,增量模型中的数据可能处于无效状态 - 不会进行原子更新。
总而言之,该方法:
- 插件创建一个临时表
actor_sumary__dbt_tmp
。发生更改的行会流入此表。 - 对当前的
actor_summary
表发出DELETE
。通过 id 从actor_sumary__dbt_tmp
删除行。 - 使用
INSERT INTO actor_summary SELECT * FROM actor_sumary__dbt_tmp
将actor_sumary__dbt_tmp
中的行插入actor_summary
。
此过程如下所示:

insert_overwrite 模式(实验性)
执行以下步骤:
- 创建一个具有与增量模型关系相同结构的暂存(临时)表:
CREATE TABLE {staging} AS {target}
。 - 仅将新记录(通过 SELECT 生成)插入到暂存表中。
- 仅将新分区(存在于暂存表中的)替换到目标表中。
这种方法有以下优点:
- 与默认策略相比更快,因为它不会复制整个表。
- 比其他策略更安全,因为在 INSERT 操作完成成功之前不会修改原始表:在中间失败的情况下,原始表不会被修改。
- 实现了“分区不可变性”的数据工程最佳实践。这简化了增量和并行数据处理、回滚等。

创建快照
dbt 快照允许记录随时间对可变模型的更改。这反过来允许在模型上进行时间点查询,分析师可以“回顾过去”模型的先前状态。这是通过 类型-2 缓慢变化维 实现的,其中从和到日期列记录行何时有效。该功能由 ClickHouse 插件支持,下面演示了这一点。
此示例假设您已完成 创建增量表模型。确保您的 actor_summary.sql 不设置 inserts_only=True。您的 models/actor_summary.sql 应如下所示:
- 在快照目录中创建一个文件
actor_summary
。
- 使用以下内容更新 actor_summary.sql 文件的内容:
关于该内容的几点观察:
- SELECT 查询定义了您希望随着时间快照过的结果。函数 ref 用于引用我们之前创建的 actor_summary 模型。
- 我们需要一个时间戳列来指示记录更改。我们可以在这里使用我们的 updated_at 列(请参见 创建增量表模型)。parameter strategy 指示我们使用时间戳来表示更新,parameter updated_at 指定要使用的列。如果模型中不存在此列,您也可以使用 check strategy。这在效率上大大减弱,并要求用户指定要比较的列的列表。 dbt 比较这些列的当前值和历史值,记录任何变化(或在相同情况下不进行任何操作)。
- 运行命令
dbt snapshot
。
注意如何在目标架构中创建了 actor_summary_snapshot 表(由 target_schema 参数决定)。
- 抽样这些数据,您将看到 dbt 包含了 dbt_valid_from 和 dbt_valid_to 列。后者值被设置为 null。后续运行会更新此值。
- 让我们让我们最喜欢的演员 Clicky McClickHouse 在另外 10 部电影中出现。
- 从
imdb
目录重新运行 dbt run 命令。这将更新增量模型。一旦完成,运行 dbt snapshot 以捕捉更改。
- 如果我们现在查询快照,请注意我们有 2 行 Clicky McClickHouse。我们之前的条目现在具有 dbt_valid_to 值。我们的新值以 dbt_valid_from 列中的相同值记录,并且具有 dbt_valid_to 值为 null。如果我们有新行,这些行也将附加到快照中。
有关 dbt 快照的更多详细信息,请参见 这里。
使用种子
dbt 提供从 CSV 文件加载数据的能力。此功能不适合用于加载数据库的大型导出,更适合用于通常用于代码表和 字典 的小文件,例如将国家代码映射到国家名称。为了简单示范,我们生成并上传一个使用 seed 功能的类型代码列表。
- 我们从现有数据集中生成类型代码列表。从 dbt 目录中,使用
clickhouse-client
创建文件seeds/genre_codes.csv
:
- 执行
dbt seed
命令。这将在我们的imdb_dbt
数据库中创建一个新表genre_codes
(如我们的模式配置所定义),表中包含来自 CSV 文件的行。
- 确认这些行已加载:
限制
当前的 ClickHouse 插件对于 dbt 有几个用户应注意的限制:
- 该插件当前将模型物化为使用
INSERT TO SELECT
的表。这实际上意味着数据重复。非常大的数据集(PB)可能导致极长的运行时间,使某些模型不可行。旨在最小化任何查询返回的行数,利用 GROUP BY 进行优化。优先选择汇总数据的模型,而不是单纯执行转换同时保持源行计数的模型。 - 使用分布式表表示模型时,用户必须手动在每个节点上创建底层的副本表。然后,可以在其上创建分布式表。该插件不管理集群创建。
- 当 dbt 在数据库中创建关系(表/视图)时,通常会创建为:
{{ database }}.{{ schema }}.{{ table/view id }}
。ClickHouse 对 schema 的理解是有限的。因此,插件使用{{schema}}.{{ table/view id }}
,其中schema
是 ClickHouse 数据库。
进一步信息
之前的指南仅触及 dbt 功能的表面。建议用户阅读优秀的 dbt 文档。
关于插件的其他配置描述在 这里。
Fivetran
dbt-clickhouse
连接器也可用于 Fivetran 转换,允许直接在 Fivetran 平台内使用 dbt
进行无缝集成和转换能力。