集成 dbt 和 ClickHouse
dbt (数据构建工具) 使分析工程师能够仅通过编写选择语句来转换数据仓库中的数据。 dbt 负责将这些选择语句物化为数据库中的表和视图对象 - 执行 提取、加载和转换 (ELT) 的 T。 用户可以创建由 SELECT 语句定义的模型。
在 dbt 中,这些模型可以进行交叉引用和分层,以允许构建更高级的概念。 连接模型所需的样板 SQL 会自动生成。 此外,dbt 能够识别模型之间的依赖关系,并确保按照适当的顺序创建它们,使用有向无环图 (DAG)。
dbt 通过 ClickHouse 支持的插件 与 ClickHouse 兼容。 我们通过一个基于公开可用的 IMDB 数据集的简单示例描述连接 ClickHouse 的过程。 我们还强调了当前连接器的一些限制。
概念
dbt 引入了模型的概念。 这被定义为一个 SQL 语句,可能连接多个表。 模型可以以多种方式“物化”。 物化代表模型选择查询的构建策略。 物化背后的代码是包装您的 SELECT 查询的样板 SQL,以便创建一个新关系或更新一个现有关系。
dbt 提供 4 种类型的物化:
- 视图 (默认):模型在数据库中构建为视图。
- 表:模型在数据库中构建为表。
- 临时:模型不是直接在数据库中构建,而是作为公共表表达式拉入依赖模型中。
- 增量:模型初步物化为表,在后续运行中,dbt 向表中插入新行并更新已更改的行。
附加的语法和子句定义当其基础数据更改时如何更新这些模型。 dbt 通常建议在性能成为问题之前从视图物化开始。 表物化通过将模型查询的结果捕获为表提供了查询时的性能改进,但代价是增加存储。 增量方法进一步在此基础上构建,允许捕获对基础数据的后续更新,以反映在目标表中。
当前的插件 ClickHouse 支持的插件 支持 视图、表、临时 和 增量 物化。 该插件还支持 dbt 快照 和 种子,我们将在本指南中探讨这些内容。
对于接下来的指南,我们假设您有一个可用的 ClickHouse 实例。
dbt 和 ClickHouse 插件的设置
dbt
我们假设在以下示例中使用 dbt CLI。 用户可能还希望考虑 dbt Cloud,它提供基于 Web 的集成开发环境 (IDE),允许用户编辑和运行项目。
dbt 提供多种 CLI 安装选项。 请按照 这里 中描述的说明进行操作。 此时仅安装 dbt-core。 我们推荐使用 pip
。
重要:以下内容在 Python 3.9 下进行了测试。
ClickHouse 插件
安装 dbt ClickHouse 插件:
准备 ClickHouse
当建模高度关系数据时,dbt 表现出色。 为了示例,我们提供一个小的 IMDB 数据集,具有以下关系模式。 该数据集源自 关系数据集仓库。 相较于常用的 dbt 模式,这显得微不足道,但代表了一个可管理的样本。

我们使用这些表的一部分,如下所示。
创建以下表:
表 roles
的列 created_at
,默认为值 now()
。 我们稍后使用它来识别模型的增量更新 - 请参见 增量模型。
我们使用 s3
函数从公共端点读取源数据以插入数据。 运行以下命令以填充表:
这些执行时间可能因带宽而异,但每个命令应该只需几秒即可完成。 执行以下查询以计算每位演员的摘要,按电影出场次数降序排列,并确认数据成功加载:
响应应该如下所示:
在后面的指南中,我们将把这个查询转换为一个模型 - 在 ClickHouse 中物化为一个 dbt 视图和表。
连接到 ClickHouse
- 创建一个 dbt 项目。在这种情况下,我们以
imdb
源命名。 在提示时,选择clickhouse
作为数据库源。
cd
到您的项目文件夹:
-
此时,您将需要所选的文本编辑器。在下面的示例中,我们使用流行的 VS Code。 打开 IMDB 目录,您应该会看到一组 yml 和 sql 文件:
-
更新您的
dbt_project.yml
文件,以指定我们的第一个模型 -actor_summary
,并将配置文件设置为clickhouse_imdb
。 -
接下来,我们需要向 dbt 提供 ClickHouse 实例的连接详细信息。 将以下内容添加到您的
~/.dbt/profiles.yml
。
请注意需要修改用户和密码。 有关其他可用设置的文档,请参见 这里。
- 从 IMDB 目录,执行
dbt debug
命令以确认 dbt 是否能够连接到 ClickHouse。
确认响应包含 Connection test: [OK connection ok]
,表示连接成功。
创建简单的视图物化
在使用视图物化时,模型在每次运行时都会通过 ClickHouse 中的 CREATE VIEW AS
语句重新构建。这不需要额外的数据存储,但查询的速度会比表物化慢。
- 从
imdb
文件夹中删除目录models/example
:
- 在
models
文件夹内的actors
中创建一个新文件。 在这里,我们创建每个代表演员模型的文件:
- 在
models/actors
文件夹中创建schema.yml
和actor_summary.sql
文件。
文件 schema.yml
定义我们的表。 这些将在宏中随后可用。 编辑 models/actors/schema.yml
以包含以下内容:
actors_summary.sql
定义我们的实际模型。 请注意,在配置函数中,我们还请求该模型在 ClickHouse 中作为视图物化。 我们的表通过函数 source
从 schema.yml
文件中引用,例如 source('imdb', 'movies')
指的是 imdb
数据库中的 movies
表。 编辑 models/actors/actors_summary.sql
以包含以下内容:
请注意我们如何在最终的 actor_summary 中包含列 updated_at
。 我们稍后将用其进行增量物化。
- 从
imdb
目录中执行命令dbt run
。
- dbt 将按照请求将模型表示为 ClickHouse 中的视图。 我们现在可以直接查询这个视图。 该视图将已在
imdb_dbt
数据库中创建 - 这是由文件~/.dbt/profiles.yml
中clickhouse_imdb
配置文件下的 schema 参数确定的。
查询此视图,我们可以用更简单的语法复制早先查询的结果:
创建表物化
在上一个示例中,我们的模型以视图的形式物化。 虽然这可能对某些查询提供了足够的性能,但更复杂的 SELECT 或频繁执行的查询可能更适合物化为表。 这种物化对于将由 BI 工具查询的模型非常有用,以确保用户获得更快的体验。 这有效地导致查询结果存储为新表,伴随相关的存储开销 - 实际上执行了 INSERT TO SELECT
。 请注意,此表将在每次构建时重新构建,即它并不是增量的。因此,较大的结果集可能会导致长执行时间 - 请参见 dbt 限制。
- 修改文件
actors_summary.sql
,使materialized
参数设置为table
。 注意如何定义ORDER BY
,并注意我们使用MergeTree
表引擎:
- 从
imdb
目录执行命令dbt run
。 此执行可能需要稍长时间 - 在大多数机器上约 10 秒。
- 确认创建了表
imdb_dbt.actor_summary
:
您应该看到具有适当数据类型的表:
- 确认这个表的结果与之前的响应一致。 现在模型是表格,响应时间显著改善:
随意对该模型发出其他查询。 例如,哪些演员的电影排名最高并且出场次数超过 5 次?
创建增量物化
上一个示例创建了一个表以物化模型。 该表将在每个 dbt 执行中重新构建。 对于较大的结果集或复杂转换,这可能不可行且极其昂贵。 为了解决这个问题并减少构建时间,dbt 提供增量物化。 这允许 dbt 自上次执行以来将记录插入或更新到表中,使其适用于事件样式数据。 在后台,会创建一个临时表以包含所有已更新记录,然后将所有未修改的记录以及更新的记录插入到新的目标表中。 这给大型结果集带来了与表模型类似的 限制。
为克服大型集的这些限制,插件支持“insert_only”模式,其中所有更新直接插入目标表,而不会创建临时表(稍后详细介绍)。
为了说明这个例子,我们将添加演员 "Clicky McClickHouse",他将在令人难以置信的 910 部电影中亮相 - 确保他出现在的电影数量甚至超过了 Mel Blanc。
-
首先,我们将模型修改为增量类型。 这个添加要求:
- unique_key - 为了确保插件能够唯一标识行,我们必须提供一个 unique_key - 在这种情况下,我们查询中的
id
字段就足够了。 这确保我们在物化表中不会有行重复。 有关独特性约束的更多详细信息,请参见 这里。 - 增量过滤 - 我们还需要告诉 dbt 如何在增量运行中识别哪些行发生了变化。 这通过提供一个增量表达式来实现。 通常,这涉及到事件数据的时间戳;因此我们使用
updated_at
时间戳字段。 该列在插入行时默认为now()
的值,允许识别新角色。此外,我们还需要识别新增演员的替代情况。 使用{{this}}
变量,表示现有物化表,通过表达式where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}})
,我们可以给出这一条件。 我们将其嵌入到{% if is_incremental() %}
条件内,确保它仅在增量运行时使用,而不是在首次构建表时使用。 有关增量模型行过滤的更多详细信息,请参阅 dbt 文档中的此讨论。
更新文件
actor_summary.sql
如下: - unique_key - 为了确保插件能够唯一标识行,我们必须提供一个 unique_key - 在这种情况下,我们查询中的
请注意,我们的模型将仅响应对 roles
和 actors
表的更新和添加。 为了响应所有表,建议用户将该模型拆分为多个子模型 - 每个模型都有自己的增量标准。 这些模型可以再相互引用和连接。 有关交叉引用模型的更多详细信息,请参见 这里。
- 执行
dbt run
并确认结果表的结果:
- 现在我们将向模型添加数据以说明增量更新。 将我们的演员 "Clicky McClickHouse" 添加到
actors
表:
- 让我们让 "Clicky" 在 910 部随机电影中饰演角色:
- 确认他确实现在是出场次数最多的演员,方法是查询底层源表,并绕过任何 dbt 模型:
- 执行
dbt run
,确认我们的模型已更新,并与上述结果匹配:
内部细节
我们可以通过查询 ClickHouse 的查询日志来识别为实现上述增量更新而执行的语句。
根据执行时期调整上述查询。 我们将结果检查留给用户,但强调插件用于执行增量更新的一般策略:
- 插件创建一个临时表
actor_sumary__dbt_tmp
。 更改的行被流入此表。 - 创建一个新表
actor_summary_new
。 旧表的行反过来会流入新表,并进行检查,以确保行 ID 不存在于临时表。这有效地处理了更新和重复。 - 临时表中的结果流入新的
actor_summary
表: - 最后,通过
EXCHANGE TABLES
语句原子地将新表与旧版本进行交换。 旧表和临时表相应被丢弃。
这可视化如下:

这个策略在非常大的模型上可能会遇到挑战。 有关更多详细信息,请参见 限制。
追加策略 (insert-only 模式)
为了克服增量模型中大型数据集的限制,插件使用 dbt 配置参数 incremental_strategy
。 这可以设置为 append
的值。 设置后,更新的行直接插入目标表 (即 imdb_dbt.actor_summary
) 中,而不创建临时表。
注意:仅追加模式要求您的数据是不可变的或接受重复行。 如果您希望增量表模型支持更改的行,则不要使用此模式!
为了说明这一模式,我们将添加另一位新演员并重新执行带有 incremental_strategy='append'
的 dbt run。
- 在 actor_summary.sql 中配置仅追加模式:
- 我们再添加一位著名演员 - Danny DeBito
- 让我们让 Danny 在 920 部随机电影中饰演角色。
- 执行
dbt run
并确认 Danny 已添加到 actor-summary 表中。
请注意,增量更新的速度要比插入 "Clicky" 快得多。
再次检查查询_log 表以揭示 2 次增量运行之间的差异:
在此运行中,仅将新行添加到 imdb_dbt.actor_summary
表中,并且没有涉及表创建。
删除+插入模式 (实验性)
历史上,ClickHouse 对更新和删除的支持有限,采用异步 Mutations 的形式。这些操作可能非常耗费 IO,通常应该避免。
ClickHouse 22.8 引入了 轻量级删除。 这些目前仍处于实验阶段,但提供了一种更高效的删除数据的方式。
可以通过 incremental_strategy
参数为模型配置此模式,即:
该策略直接在目标模型的表上运行,因此如果在操作期间出现问题,则增量模型中的数据可能处于无效状态 - 并不进行原子更新。
总之,这种方法:
- 插件创建一个临时表
actor_sumary__dbt_tmp
。 更改的行被流入此表。 - 对当前
actor_summary
表执行DELETE
。 通过 id 从actor_sumary__dbt_tmp
删除行。 - 使用
INSERT INTO actor_summary SELECT * FROM actor_sumary__dbt_tmp
将行从actor_sumary__dbt_tmp
插入actor_summary
。
此过程如下所示:

insert_overwrite 模式 (实验性)
执行以下步骤:
- 创建一个具有与增量模型关系相同结构的临时 (暂存) 表:
CREATE TABLE {staging} AS {target}
。 - 仅将新记录(由 SELECT 生成)插入到暂存表中。
- 将仅新分区(存在于暂存表中)替换到目标表中。
这种方法具有以下优点:
- 它比默认策略更快,因为它不会复制整个表。
- 它比其他策略更安全,因为它在 INSERT 操作成功完成之前不会修改原始表:在中间出现失败的情况下,原始表不被修改。
- 它实现了数据工程最佳实践中的“分区不可变性”。 这简化了增量和并行数据处理、回滚等。

创建快照
dbt 快照允许记录对可变模型的更改。 这反过来允许对模型进行时间点查询,分析师可以“回顾过去”,查看模型的先前状态。 这通过 类型2 缓慢变化维度 来实现,其中从日期到日期列记录行的有效时间。 此功能由 ClickHouse 插件支持,下面进行了演示。
此示例假定您已完成 创建增量表模型。 请确保您的 actor_summary.sql 不设置 inserts_only=True。 您的 models/actor_summary.sql 应如下所示:
- 在快照目录中创建一个文件
actor_summary
。
- 用以下内容更新 actor_summary.sql 文件:
有关此内容的一些观察:
- 选择查询定义您希望随时间快照的结果。 函数 ref 用于引用我们之前创建的 actor_summary 模型。
- 我们需要一个时间戳列来指示记录更改。 我们的 updated_at 列(参见 创建增量表模型)可以在这里使用。 参数 strategy 指示我们使用时间戳来表示更新,参数 updated_at 指定要使用的列。如果在模型中没有此列,您可以改为使用 检查策略。 这显著低效,并要求用户指定要比较的列的列表。 dbt 比较这些列的当前值和历史值,记录任何更改(或在相同的情况下不采取任何行动)。
- 运行命令
dbt snapshot
。
注意,已在快照数据库中创建表 actor_summary_snapshot(由 target_schema 参数确定)。
- 取样此数据,您将看到 dbt 已包含列 dbt_valid_from 和 dbt_valid_to。 后者的值被设置为 null。 后续运行将更新此内容。
- 让我们最喜欢的演员 Clicky McClickHouse 再拍 10 部电影。
- 从
imdb
目录重新运行 dbt run 命令。 这将更新增量模型。 一旦完成,运行 dbt snapshot 以捕获更改。
- 如果我们现在查询快照,可以注意到 Clicky McClickHouse 有 2 行。 我们之前的条目现在具有 dbt_valid_to 值。 我们的新值在 dbt_valid_from 列中记录了相同的值,并且 dbt_valid_to 的值为 null。 如果我们确实有新行,这些也会被附加到快照中。
有关 dbt 快照的更多详细信息,请参见 这里。
使用种子
dbt 提供从 CSV 文件加载数据的能力。 这个功能不适合加载数据库的大型导出,更适合用于典型用于代码表和 字典 的小文件,例如,将国家代码映射到国家名称。 作为简单示例,我们生成然后上传一个流派代码列表,使用种子功能。
- 我们从现有数据集中生成一个流派代码列表。 从 dbt 目录中,使用
clickhouse-client
创建文件seeds/genre_codes.csv
:
- 执行
dbt seed
命令。 这将在我们的数据库imdb_dbt
中创建一个新表genre_codes
(由我们的 schema 配置定义),并填充来自 CSV 文件的行。
- 确认这些行已被加载:
限制
当前的 ClickHouse dbt 插件有几个用户应该注意的限制:
- 该插件当前使用
INSERT TO SELECT
将模型物化为表。 这实际上意味着数据重复。 非常大的数据集(PB)可能导致极长的运行时间,使某些模型无法使用。 力求最小化任何查询返回的行数,尽可能使用 GROUP BY。 优先选择总结数据的模型,而不是执行转换的模型,同时保持源的行数。 - 要使用分布式表表示模型,用户必须手动在每个节点上创建基础副本表。 分布式表可以在这些表之上创建。 该插件不管理集群创建。
- 当 dbt 在数据库中创建关系(表/视图)时,它通常以
{{ database }}.{{ schema }}.{{ table/view id }}
的形式创建。 ClickHouse 对架构没有概念。 因此,该插件使用{{schema}}.{{ table/view id }}
,其中schema
是 ClickHouse 数据库。
进一步信息
前面的指南只是触及了 dbt 功能的表面。 建议用户阅读优秀的 dbt 文档。
关于插件的其他配置,在 这里 中进行了描述。
Fivetran
dbt-clickhouse
连接器也可用于 Fivetran 转换,允许在 Fivetran 平台中直接实现无缝集成和转换能力。
相关内容
- 博客和网络研讨会:ClickHouse 和 dbt - 来自社区的礼物