跳到主要内容
跳到主要内容

集成 dbt 和 ClickHouse

ClickHouse Supported

dbt (数据构建工具) 使分析工程师能够仅通过编写选择语句来转换数据仓库中的数据。 dbt 负责将这些选择语句物化为数据库中的表和视图对象 - 执行 提取、加载和转换 (ELT) 的 T。 用户可以创建由 SELECT 语句定义的模型。

在 dbt 中,这些模型可以进行交叉引用和分层,以允许构建更高级的概念。 连接模型所需的样板 SQL 会自动生成。 此外,dbt 能够识别模型之间的依赖关系,并确保按照适当的顺序创建它们,使用有向无环图 (DAG)。

dbt 通过 ClickHouse 支持的插件 与 ClickHouse 兼容。 我们通过一个基于公开可用的 IMDB 数据集的简单示例描述连接 ClickHouse 的过程。 我们还强调了当前连接器的一些限制。

概念

dbt 引入了模型的概念。 这被定义为一个 SQL 语句,可能连接多个表。 模型可以以多种方式“物化”。 物化代表模型选择查询的构建策略。 物化背后的代码是包装您的 SELECT 查询的样板 SQL,以便创建一个新关系或更新一个现有关系。

dbt 提供 4 种类型的物化:

  • 视图 (默认):模型在数据库中构建为视图。
  • :模型在数据库中构建为表。
  • 临时:模型不是直接在数据库中构建,而是作为公共表表达式拉入依赖模型中。
  • 增量:模型初步物化为表,在后续运行中,dbt 向表中插入新行并更新已更改的行。

附加的语法和子句定义当其基础数据更改时如何更新这些模型。 dbt 通常建议在性能成为问题之前从视图物化开始。 表物化通过将模型查询的结果捕获为表提供了查询时的性能改进,但代价是增加存储。 增量方法进一步在此基础上构建,允许捕获对基础数据的后续更新,以反映在目标表中。

当前的插件 ClickHouse 支持的插件 支持 视图临时增量 物化。 该插件还支持 dbt 快照种子,我们将在本指南中探讨这些内容。

对于接下来的指南,我们假设您有一个可用的 ClickHouse 实例。

dbt 和 ClickHouse 插件的设置

dbt

我们假设在以下示例中使用 dbt CLI。 用户可能还希望考虑 dbt Cloud,它提供基于 Web 的集成开发环境 (IDE),允许用户编辑和运行项目。

dbt 提供多种 CLI 安装选项。 请按照 这里 中描述的说明进行操作。 此时仅安装 dbt-core。 我们推荐使用 pip

重要:以下内容在 Python 3.9 下进行了测试。

ClickHouse 插件

安装 dbt ClickHouse 插件:

准备 ClickHouse

当建模高度关系数据时,dbt 表现出色。 为了示例,我们提供一个小的 IMDB 数据集,具有以下关系模式。 该数据集源自 关系数据集仓库。 相较于常用的 dbt 模式,这显得微不足道,但代表了一个可管理的样本。

我们使用这些表的一部分,如下所示。

创建以下表:

备注

roles 的列 created_at,默认为值 now()。 我们稍后使用它来识别模型的增量更新 - 请参见 增量模型

我们使用 s3 函数从公共端点读取源数据以插入数据。 运行以下命令以填充表:

这些执行时间可能因带宽而异,但每个命令应该只需几秒即可完成。 执行以下查询以计算每位演员的摘要,按电影出场次数降序排列,并确认数据成功加载:

响应应该如下所示:

在后面的指南中,我们将把这个查询转换为一个模型 - 在 ClickHouse 中物化为一个 dbt 视图和表。

连接到 ClickHouse

  1. 创建一个 dbt 项目。在这种情况下,我们以 imdb 源命名。 在提示时,选择 clickhouse 作为数据库源。
  1. cd 到您的项目文件夹:
  1. 此时,您将需要所选的文本编辑器。在下面的示例中,我们使用流行的 VS Code。 打开 IMDB 目录,您应该会看到一组 yml 和 sql 文件:

  2. 更新您的 dbt_project.yml 文件,以指定我们的第一个模型 - actor_summary,并将配置文件设置为 clickhouse_imdb

  3. 接下来,我们需要向 dbt 提供 ClickHouse 实例的连接详细信息。 将以下内容添加到您的 ~/.dbt/profiles.yml

请注意需要修改用户和密码。 有关其他可用设置的文档,请参见 这里

  1. 从 IMDB 目录,执行 dbt debug 命令以确认 dbt 是否能够连接到 ClickHouse。

确认响应包含 Connection test: [OK connection ok],表示连接成功。

创建简单的视图物化

在使用视图物化时,模型在每次运行时都会通过 ClickHouse 中的 CREATE VIEW AS 语句重新构建。这不需要额外的数据存储,但查询的速度会比表物化慢。

  1. imdb 文件夹中删除目录 models/example
  1. models 文件夹内的 actors 中创建一个新文件。 在这里,我们创建每个代表演员模型的文件:
  1. models/actors 文件夹中创建 schema.ymlactor_summary.sql 文件。

文件 schema.yml 定义我们的表。 这些将在宏中随后可用。 编辑 models/actors/schema.yml 以包含以下内容:

actors_summary.sql 定义我们的实际模型。 请注意,在配置函数中,我们还请求该模型在 ClickHouse 中作为视图物化。 我们的表通过函数 sourceschema.yml 文件中引用,例如 source('imdb', 'movies') 指的是 imdb 数据库中的 movies 表。 编辑 models/actors/actors_summary.sql 以包含以下内容:

请注意我们如何在最终的 actor_summary 中包含列 updated_at。 我们稍后将用其进行增量物化。

  1. imdb 目录中执行命令 dbt run
  1. dbt 将按照请求将模型表示为 ClickHouse 中的视图。 我们现在可以直接查询这个视图。 该视图将已在 imdb_dbt 数据库中创建 - 这是由文件 ~/.dbt/profiles.ymlclickhouse_imdb 配置文件下的 schema 参数确定的。

查询此视图,我们可以用更简单的语法复制早先查询的结果:

创建表物化

在上一个示例中,我们的模型以视图的形式物化。 虽然这可能对某些查询提供了足够的性能,但更复杂的 SELECT 或频繁执行的查询可能更适合物化为表。 这种物化对于将由 BI 工具查询的模型非常有用,以确保用户获得更快的体验。 这有效地导致查询结果存储为新表,伴随相关的存储开销 - 实际上执行了 INSERT TO SELECT。 请注意,此表将在每次构建时重新构建,即它并不是增量的。因此,较大的结果集可能会导致长执行时间 - 请参见 dbt 限制

  1. 修改文件 actors_summary.sql,使 materialized 参数设置为 table。 注意如何定义 ORDER BY,并注意我们使用 MergeTree 表引擎:
  1. imdb 目录执行命令 dbt run。 此执行可能需要稍长时间 - 在大多数机器上约 10 秒。
  1. 确认创建了表 imdb_dbt.actor_summary

您应该看到具有适当数据类型的表:

  1. 确认这个表的结果与之前的响应一致。 现在模型是表格,响应时间显著改善:

随意对该模型发出其他查询。 例如,哪些演员的电影排名最高并且出场次数超过 5 次?

创建增量物化

上一个示例创建了一个表以物化模型。 该表将在每个 dbt 执行中重新构建。 对于较大的结果集或复杂转换,这可能不可行且极其昂贵。 为了解决这个问题并减少构建时间,dbt 提供增量物化。 这允许 dbt 自上次执行以来将记录插入或更新到表中,使其适用于事件样式数据。 在后台,会创建一个临时表以包含所有已更新记录,然后将所有未修改的记录以及更新的记录插入到新的目标表中。 这给大型结果集带来了与表模型类似的 限制

为克服大型集的这些限制,插件支持“insert_only”模式,其中所有更新直接插入目标表,而不会创建临时表(稍后详细介绍)。

为了说明这个例子,我们将添加演员 "Clicky McClickHouse",他将在令人难以置信的 910 部电影中亮相 - 确保他出现在的电影数量甚至超过了 Mel Blanc

  1. 首先,我们将模型修改为增量类型。 这个添加要求:

    1. unique_key - 为了确保插件能够唯一标识行,我们必须提供一个 unique_key - 在这种情况下,我们查询中的 id 字段就足够了。 这确保我们在物化表中不会有行重复。 有关独特性约束的更多详细信息,请参见 这里
    2. 增量过滤 - 我们还需要告诉 dbt 如何在增量运行中识别哪些行发生了变化。 这通过提供一个增量表达式来实现。 通常,这涉及到事件数据的时间戳;因此我们使用 updated_at 时间戳字段。 该列在插入行时默认为 now() 的值,允许识别新角色。此外,我们还需要识别新增演员的替代情况。 使用 {{this}} 变量,表示现有物化表,通过表达式 where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}}),我们可以给出这一条件。 我们将其嵌入到 {% if is_incremental() %} 条件内,确保它仅在增量运行时使用,而不是在首次构建表时使用。 有关增量模型行过滤的更多详细信息,请参阅 dbt 文档中的此讨论

    更新文件 actor_summary.sql 如下:

请注意,我们的模型将仅响应对 rolesactors 表的更新和添加。 为了响应所有表,建议用户将该模型拆分为多个子模型 - 每个模型都有自己的增量标准。 这些模型可以再相互引用和连接。 有关交叉引用模型的更多详细信息,请参见 这里

  1. 执行 dbt run 并确认结果表的结果:
  1. 现在我们将向模型添加数据以说明增量更新。 将我们的演员 "Clicky McClickHouse" 添加到 actors 表:
  1. 让我们让 "Clicky" 在 910 部随机电影中饰演角色:
  1. 确认他确实现在是出场次数最多的演员,方法是查询底层源表,并绕过任何 dbt 模型:
  1. 执行 dbt run,确认我们的模型已更新,并与上述结果匹配:

内部细节

我们可以通过查询 ClickHouse 的查询日志来识别为实现上述增量更新而执行的语句。

根据执行时期调整上述查询。 我们将结果检查留给用户,但强调插件用于执行增量更新的一般策略:

  1. 插件创建一个临时表 actor_sumary__dbt_tmp。 更改的行被流入此表。
  2. 创建一个新表 actor_summary_new。 旧表的行反过来会流入新表,并进行检查,以确保行 ID 不存在于临时表。这有效地处理了更新和重复。
  3. 临时表中的结果流入新的 actor_summary 表:
  4. 最后,通过 EXCHANGE TABLES 语句原子地将新表与旧版本进行交换。 旧表和临时表相应被丢弃。

这可视化如下:

这个策略在非常大的模型上可能会遇到挑战。 有关更多详细信息,请参见 限制

追加策略 (insert-only 模式)

为了克服增量模型中大型数据集的限制,插件使用 dbt 配置参数 incremental_strategy。 这可以设置为 append 的值。 设置后,更新的行直接插入目标表 (即 imdb_dbt.actor_summary) 中,而不创建临时表。 注意:仅追加模式要求您的数据是不可变的或接受重复行。 如果您希望增量表模型支持更改的行,则不要使用此模式!

为了说明这一模式,我们将添加另一位新演员并重新执行带有 incremental_strategy='append' 的 dbt run。

  1. 在 actor_summary.sql 中配置仅追加模式:
  1. 我们再添加一位著名演员 - Danny DeBito
  1. 让我们让 Danny 在 920 部随机电影中饰演角色。
  1. 执行 dbt run 并确认 Danny 已添加到 actor-summary 表中。

请注意,增量更新的速度要比插入 "Clicky" 快得多。

再次检查查询_log 表以揭示 2 次增量运行之间的差异:

在此运行中,仅将新行添加到 imdb_dbt.actor_summary 表中,并且没有涉及表创建。

删除+插入模式 (实验性)

历史上,ClickHouse 对更新和删除的支持有限,采用异步 Mutations 的形式。这些操作可能非常耗费 IO,通常应该避免。

ClickHouse 22.8 引入了 轻量级删除。 这些目前仍处于实验阶段,但提供了一种更高效的删除数据的方式。

可以通过 incremental_strategy 参数为模型配置此模式,即:

该策略直接在目标模型的表上运行,因此如果在操作期间出现问题,则增量模型中的数据可能处于无效状态 - 并不进行原子更新。

总之,这种方法:

  1. 插件创建一个临时表 actor_sumary__dbt_tmp。 更改的行被流入此表。
  2. 对当前 actor_summary 表执行 DELETE。 通过 id 从 actor_sumary__dbt_tmp 删除行。
  3. 使用 INSERT INTO actor_summary SELECT * FROM actor_sumary__dbt_tmp 将行从 actor_sumary__dbt_tmp 插入 actor_summary

此过程如下所示:

insert_overwrite 模式 (实验性)

执行以下步骤:

  1. 创建一个具有与增量模型关系相同结构的临时 (暂存) 表: CREATE TABLE {staging} AS {target}
  2. 仅将新记录(由 SELECT 生成)插入到暂存表中。
  3. 将仅新分区(存在于暂存表中)替换到目标表中。

这种方法具有以下优点:

  • 它比默认策略更快,因为它不会复制整个表。
  • 它比其他策略更安全,因为它在 INSERT 操作成功完成之前不会修改原始表:在中间出现失败的情况下,原始表不被修改。
  • 它实现了数据工程最佳实践中的“分区不可变性”。 这简化了增量和并行数据处理、回滚等。

创建快照

dbt 快照允许记录对可变模型的更改。 这反过来允许对模型进行时间点查询,分析师可以“回顾过去”,查看模型的先前状态。 这通过 类型2 缓慢变化维度 来实现,其中从日期到日期列记录行的有效时间。 此功能由 ClickHouse 插件支持,下面进行了演示。

此示例假定您已完成 创建增量表模型。 请确保您的 actor_summary.sql 不设置 inserts_only=True。 您的 models/actor_summary.sql 应如下所示:

  1. 在快照目录中创建一个文件 actor_summary
  1. 用以下内容更新 actor_summary.sql 文件:

有关此内容的一些观察:

  • 选择查询定义您希望随时间快照的结果。 函数 ref 用于引用我们之前创建的 actor_summary 模型。
  • 我们需要一个时间戳列来指示记录更改。 我们的 updated_at 列(参见 创建增量表模型)可以在这里使用。 参数 strategy 指示我们使用时间戳来表示更新,参数 updated_at 指定要使用的列。如果在模型中没有此列,您可以改为使用 检查策略。 这显著低效,并要求用户指定要比较的列的列表。 dbt 比较这些列的当前值和历史值,记录任何更改(或在相同的情况下不采取任何行动)。
  1. 运行命令 dbt snapshot

注意,已在快照数据库中创建表 actor_summary_snapshot(由 target_schema 参数确定)。

  1. 取样此数据,您将看到 dbt 已包含列 dbt_valid_from 和 dbt_valid_to。 后者的值被设置为 null。 后续运行将更新此内容。
  1. 让我们最喜欢的演员 Clicky McClickHouse 再拍 10 部电影。
  1. imdb 目录重新运行 dbt run 命令。 这将更新增量模型。 一旦完成,运行 dbt snapshot 以捕获更改。
  1. 如果我们现在查询快照,可以注意到 Clicky McClickHouse 有 2 行。 我们之前的条目现在具有 dbt_valid_to 值。 我们的新值在 dbt_valid_from 列中记录了相同的值,并且 dbt_valid_to 的值为 null。 如果我们确实有新行,这些也会被附加到快照中。

有关 dbt 快照的更多详细信息,请参见 这里

使用种子

dbt 提供从 CSV 文件加载数据的能力。 这个功能不适合加载数据库的大型导出,更适合用于典型用于代码表和 字典 的小文件,例如,将国家代码映射到国家名称。 作为简单示例,我们生成然后上传一个流派代码列表,使用种子功能。

  1. 我们从现有数据集中生成一个流派代码列表。 从 dbt 目录中,使用 clickhouse-client 创建文件 seeds/genre_codes.csv
  1. 执行 dbt seed 命令。 这将在我们的数据库 imdb_dbt 中创建一个新表 genre_codes(由我们的 schema 配置定义),并填充来自 CSV 文件的行。
  1. 确认这些行已被加载:

限制

当前的 ClickHouse dbt 插件有几个用户应该注意的限制:

  1. 该插件当前使用 INSERT TO SELECT 将模型物化为表。 这实际上意味着数据重复。 非常大的数据集(PB)可能导致极长的运行时间,使某些模型无法使用。 力求最小化任何查询返回的行数,尽可能使用 GROUP BY。 优先选择总结数据的模型,而不是执行转换的模型,同时保持源的行数。
  2. 要使用分布式表表示模型,用户必须手动在每个节点上创建基础副本表。 分布式表可以在这些表之上创建。 该插件不管理集群创建。
  3. 当 dbt 在数据库中创建关系(表/视图)时,它通常以 {{ database }}.{{ schema }}.{{ table/view id }} 的形式创建。 ClickHouse 对架构没有概念。 因此,该插件使用 {{schema}}.{{ table/view id }},其中 schema 是 ClickHouse 数据库。

进一步信息

前面的指南只是触及了 dbt 功能的表面。 建议用户阅读优秀的 dbt 文档

关于插件的其他配置,在 这里 中进行了描述。

Fivetran

dbt-clickhouse 连接器也可用于 Fivetran 转换,允许在 Fivetran 平台中直接实现无缝集成和转换能力。