跳到主要内容
跳到主要内容

集成 dbt 和 ClickHouse

dbt (数据构建工具)使数据分析工程师能够通过简单编写选择语句来在其数据仓库中转换数据。 dbt 负责将这些选择语句物化为数据库中的对象,以表和视图的形式存在 - 执行 提取加载和转换 (ELT) 中的 T。用户可以创建由 SELECT 语句定义的模型。

在 dbt 中,这些模型可以相互引用,并分层构建更高层次的概念。连接模型所需的模板 SQL 会自动生成。此外,dbt 识别模型之间的依赖关系,并使用有向无环图(DAG)确保它们按适当顺序创建。

通过 ClickHouse 支持的插件,dbt 与 ClickHouse 兼容。我们用一个基于公开的 IMDB 数据集的简单示例描述连接 ClickHouse 的过程。同时,我们还强调当前连接器的一些限制。

概念

dbt 引入了模型的概念。这被定义为一个 SQL 语句,可能连接多个表。模型可以以多种方式“物化”。物化表示模型选择查询的构建策略。物化背后的代码是模板 SQL,它将您的 SELECT 查询包装在一个语句中,以创建一个新的或更新现有的关系。

dbt 提供 4 种类型的物化:

  • 视图(默认):模型以数据库中的视图构建。
  • :模型以数据库中的表构建。
  • 暂时:模型不是直接在数据库中构建,而是以公共表表达式的形式拉入依赖模型。
  • 增量:模型最初物化为表,在后续运行中,dbt 在表中插入新行并更新已更改的行。

附加的语法和子句定义了如果底层数据发生更改,这些模型应如何更新。一般来说,dbt 建议在性能成为问题之前从视图物化开始。表物化通过将模型查询的结果捕获为表提供查询时间性能提升,但以增加存储为代价。增量方法在此基础上进一步构建,以便在底层数据后续更新时能被捕获到目标表中。

目前的 插件 支持 ClickHouse 的 视图暂时增量 物化。插件还支持 dbt 的 快照种子,我们将在本指南中探讨这些内容。

在接下来的指南中,我们假设您可以使用 ClickHouse 实例。

dbt 和 ClickHouse 插件的设置

dbt

我们假设在以下示例中使用 dbt CLI。用户也可以考虑 dbt Cloud,该服务提供基于网络的集成开发环境(IDE),允许用户编辑和运行项目。

dbt 提供了一些 CLI 安装的选项。请按照此处描述的说明进行操作。在此阶段仅安装 dbt-core。我们建议使用 pip

重要:以下内容测试于 python 3.9。

ClickHouse 插件

安装 dbt ClickHouse 插件:

准备 ClickHouse

dbt 在建模高度关系型数据时表现出色。为了示范,我们提供一个小型的 IMDB 数据集,具有以下关系模式。该数据集来源于 关系型数据集库。与使用 dbt 的常见模式相比,这很简单,但代表了一个可管理的样本:

IMDB 表模式

我们使用这些表的一个子集,如下所示。

创建以下表:

备注

roles 的列 created_at 默认值为 now()。我们稍后将使用此列来识别模型的增量更新 - 请参阅 增量模型

我们使用 s3 函数从公共端点读取源数据以插入数据。运行以下命令以填充表:

这些操作的执行时间可能因带宽而异,但每个操作应仅需几秒钟即可完成。执行以下查询以计算每位演员的摘要,按电影出现次数降序排列,并确认数据已成功加载:

响应应如下所示:

在后续指南中,我们将把此查询转换为模型 - 以 dbt 视图和表的形式在 ClickHouse 中物化。

连接到 ClickHouse

  1. 创建 dbt 项目。在这种情况下,我们以我们的 imdb 源命名。当提示时,选择 clickhouse 作为数据库源。

  2. cd 进入您的项目文件夹:

  3. 在此时,您将需要您选择的文本编辑器。在下面的示例中,我们使用流行的 VS Code。打开 IMDB 目录,您应该会看到一组 yml 和 sql 文件:

    新 dbt 项目
  4. 更新您的 dbt_project.yml 文件,以指定我们的第一个模型 - actor_summary 并将配置设置为 clickhouse_imdb

    dbt 配置 dbt 配置
  5. 接下来,我们需要提供 dbt 连接我们的 ClickHouse 实例的详细信息。将以下内容添加到您的 ~/.dbt/profiles.yml 中。

    注意需要修改用户和密码。还有其他可用设置可以在此处找到。

  6. 从 IMDB 目录,执行 dbt debug 命令以确认 dbt 是否能够连接到 ClickHouse。

    确认响应中包含 Connection test: [OK connection ok],表明连接成功。

创建简单的视图物化

在使用视图物化时,模型在每次运行时通过 CREATE VIEW AS 语句在 ClickHouse 中重建。这不需要额外的数据存储,但相较于表物化,查询速度会更慢。

  1. imdb 文件夹内,删除目录 models/example

  2. models 文件夹内的 actors 中创建一个新的文件。在这里,我们为每个演员模型创建文件:

  3. models/actors 文件夹中创建 schema.ymlactor_summary.sql 文件。

    文件 schema.yml 定义了我们的表。这些后来将可在宏中使用。编辑 models/actors/schema.yml 以包含以下内容:

    actors_summary.sql 定义了我们的实际模型。在配置函数中,我们还请求将模型在 ClickHouse 中物化为视图。我们的表通过 source 函数从 schema.yml 文件中引用,例如 source('imdb', 'movies') 是指 imdb 数据库中的 movies 表。编辑 models/actors/actors_summary.sql 以包含以下内容:

    注意我们在最终的 actor_summary 中包含了列 updated_at。我们稍后将为增量物化使用此列。

  4. imdb 目录执行命令 dbt run

  5. dbt 会将模型呈现为 ClickHouse 中的视图,如请求的那样。我们现在可以直接查询该视图。该视图将在 imdb_dbt 数据库中创建 - 这由文件 ~/.dbt/profiles.ymlclickhouse_imdb 配置下的 schema 参数决定。

    查询该视图时,我们可以用更简单的语法重现之前查询的结果:

创建表物化

在上一个示例中,我们将模型物化为视图。虽然这可能对某些查询提供足够的性能,但更复杂的 SELECT 或频繁执行的查询可能更适合物化为表。这种物化对于将被 BI 工具查询的模型非常有用,以确保用户拥有更快的体验。实际上,这会导致查询结果作为一个新表存储,伴随着相关的存储开销 - 实际上,执行了 INSERT TO SELECT。请注意,这个表每次都会重建,即它不是增量的。大结果集可能会导致较长的执行时间 - 请参见 dbt 限制

  1. 修改文件 actors_summary.sql,将 materialized 参数设置为 table。注意 ORDER BY 的定义,以及我们使用 MergeTree 表引擎:

  2. imdb 目录执行命令 dbt run。此执行可能需要稍长的时间 - 大约在大多数机器上需要 10 秒。

  3. 确认表 imdb_dbt.actor_summary 是否已创建:

    您应该会看到具有适当数据类型的表:

  4. 确认此表的结果与之前的响应一致。注意,由于模型现在是表,响应时间有了明显提升:

    随意针对该模型发出其他查询。例如,哪些演员拥有超过 5 次出现的高评分电影?

创建增量物化视图

前面的例子创建了一个表来物化模型。这个表将在每次 dbt 执行时重建。对于较大的结果集或复杂的转换,这可能是不切实际且非常昂贵的。为了解决这一挑战并减少构建时间,dbt 提供了增量物化功能。这允许 dbt 从上一次执行以来将记录插入或更新到表中,使其适合事件风格的数据。在内部,创建了一个临时表以包含所有更新的记录,然后将所有未修改的记录以及更新的记录插入到一个新的目标表中。这导致大结果集与表模型类似的限制

为了克服这些针对大数据集的限制,该插件支持“inserts_only”模式,在该模式下,所有更新直接插入目标表,而无需创建临时表(下面会详细介绍)。

为说明这个例子,我们将添加演员“Clicky McClickHouse”,他将出现在令人惊讶的910部电影中——确保他出现在比Mel Blanc 更多的影片中。

  1. 首先,我们将我们的模型修改为增量类型。该添加要求:

    1. unique_key - 为确保插件能够唯一识别行,我们必须提供一个 unique_key - 在这种情况下,查询中的 id 字段就足够了。这确保我们的物化表中不会有行的重复。有关唯一性约束的更多细节,请见这里
    2. 增量过滤器 - 我们还需要告知 dbt 应如何识别在增量运行中哪些行已更改。这是通过提供 delta 表达式来实现的。通常这涉及事件数据的时间戳;因此,我们使用更新的时间戳字段 updated_at。当插入行时,该列的默认值为 now(),使新角色得以识别。此外,我们需要识别添加新演员的另一种情况。使用 {{this}} 变量来表示现有的物化表,得到的表达式为 where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}})。我们将其嵌入到 {% if is_incremental() %} 条件中,确保它仅在增量运行时使用,而不是在首次构建表时使用。有关增量模型的行过滤的更多详细信息,请参见dbt 文档中的此讨论

    按如下方式更新文件 actor_summary.sql

    请注意,我们的模型将仅响应对 rolesactors 表的更新和新增。要响应所有表,建议用户将该模型拆分为多个子模型 - 每个子模型具有自己增量标准。这些模型可以相互引用和连接。有关交叉引用模型的进一步细节,请参见这里

  2. 执行 dbt run 并确认结果表的结果:

  3. 现在我们将向模型添加数据以说明增量更新。将我们的演员“Clicky McClickHouse”添加到 actors 表:

  4. 让我们让“Clicky”在910部随机电影中出演:

  5. 确认他确实是出现次数最多的演员,通过查询底层源表并绕过任何 dbt 模型:

  6. 执行 dbt run 并确认我们的模型已被更新并与上述结果匹配:

内部实现

我们可以通过查询 ClickHouse 的查询日志来识别为实现上述增量更新而执行的语句。

调整以上查询以适应执行时间范围。我们将结果检查留给用户,但强调该插件用于执行增量更新的一般策略:

  1. 插件创建一个临时表 actor_sumary__dbt_tmp。已更改的行流入此表。
  2. 创建一个新表 actor_summary_new。旧表的行则通过从旧表流向新表,确保行 ID 不存在于临时表中,从而有效处理更新和重复。
  3. 从临时表中获取的结果流入新的 actor_summary 表。
  4. 最后,通过 EXCHANGE TABLES 语句原子地替换新表与旧版本。旧表和临时表被依次删除。

如下图所示:

增量更新 dbt

此策略在非常大的模型上可能会遇到挑战。有关更多详细信息,请参见限制

附加策略(只插入模式)

为了克服增量模型中大数据集的限制,该插件使用 dbt 配置参数 incremental_strategy。可以将其设置为值 append。设置后,更新的行将直接插入目标表(即 imdb_dbt.actor_summary),并且不会创建临时表。 注意:仅附加模式要求您的数据是不可变的,或者可以接受重复。如果您想要一个支持更改行的增量表模型,则不要使用此模式!

为说明这种模式,我们将添加另一位新演员并使用 incremental_strategy='append' 重新执行 dbt run。

  1. 在 actor_summary.sql 中配置仅附加模式:

  2. 让我们添加另一位著名演员 - 丹尼·德维托

  3. 让我们让丹尼在920部随机电影中出演。

  4. 执行一个 dbt run 并确认丹尼已被添加到演员摘要表

注意,与 “Clicky” 的插入相比,这次的增量更新速度快了很多。

再次检查 query_log 表显示两个增量运行之间的差异:

在这次运行中,只有新行被直接添加到 imdb_dbt.actor_summary 表中,没有涉及表的创建。

删除+插入模式(实验性)

历史上,ClickHouse 对更新和删除的支持有限,形式为异步的Mutations。这些操作可能非常耗费 IO,通常应尽量避免。

ClickHouse 22.8 引入了轻量级删除。这些操作目前是实验性的,但提供了一种性能更好的数据删除方式。

此模式可以通过 incremental_strategy 参数为模型配置,例如:

该策略直接操作目标模型的表,因此如果在操作过程中出现问题,增量模型中的数据可能处于无效状态——没有原子的更新。

总之,该方法:

  1. 插件创建一个临时表 actor_sumary__dbt_tmp。已更改的行流入此表。
  2. 针对当前 actor_summary 表发出 DELETE。通过行 id 从 actor_sumary__dbt_tmp 中删除行。
  3. actor_sumary__dbt_tmp 中插入行到 actor_summary,使用 INSERT INTO actor_summary SELECT * FROM actor_sumary__dbt_tmp

此过程如下图所示:

轻量级删除增量

插入覆盖模式(实验性)

执行以下步骤:

  1. 创建一个与增量模型关系具有相同结构的暂存(临时)表:CREATE TABLE {staging} AS {target}
  2. 仅将新记录(通过 SELECT 生成)插入暂存表。
  3. 替换目标表中的新分区(在暂存表中存在的分区)。

这种方法具有以下优点:

  • 它比默认策略更快,因为它不复制整个表。
  • 它比其他策略更安全,因为在 INSERT 操作成功完成之前不会修改原始表:在中间失败的情况下,原始表未被修改。
  • 它实现了“分区不可变性”的数据工程最佳实践,简化了增量和并行数据处理、回滚等。
插入覆盖增量

创建快照

dbt 快照允许记录对可变模型随时间的变化。这反过来允许对模型进行时间点查询,分析师可以“回顾”模型的前一个状态。这是通过类型 2 缓慢变化维度来实现的,其中 from 和 to 日期列记录了一行有效的时间。这一功能得到了 ClickHouse 插件的支持,并在下面演示。

该示例假定您已完成创建增量表模型。确保您的 actor_summary.sql 没有将 inserts_only 设置为 True。您的 models/actor_summary.sql 应如下所示:

  1. 在快照目录中创建一个文件 actor_summary

  2. 使用以下内容更新 actor_summary.sql 文件:

有关此内容的一些观察:

  • 选择查询定义了您希望随时间快照的结果。使用 ref 函数引用我们之前创建的 actor_summary 模型。
  • 我们需要一个时间戳列来指示记录的变化。我们的 updated_at 列(请参见创建增量表模型)可以在此处使用。参数 strategy 指示我们使用时间戳来表示更新,参数 updated_at 指定要使用的列。如果您的模型中没有此列,您可以替代性地使用检查策略。这显著低效,并且要求用户指定要比较的列列表。 dbt 比较这些列的当前值和历史值,记录任何变化(如果相同,则不做任何操作)。
  1. 运行命令 dbt snapshot

请注意,已在快照数据库中创建了一个 actor_summary_snapshot 表(由 target_schema 参数确定)。

  1. 取样此数据,您将看到 dbt 包含了列 dbt_valid_from 和 dbt_valid_to。这些列的后一个值设置为 null。随后的运行将更新此值。

  2. 让我们最喜欢的演员 Clicky McClickHouse 再出演另外10部电影。

  3. 从 imdb 目录重新运行 dbt run 命令。此操作将更新增量模型。完成后,运行 dbt snapshot 以捕获更改。

  4. 如果我们现在查询快照,请注意 Clicky McClickHouse 现在有 2 行。我们之前的条目现在有一个 dbt_valid_to 值。我们的新值在 dbt_valid_from 列中记录了相同的值,并且 dbt_valid_to 值为 null。如果我们确实有新行,这些行也会附加到快照中。

有关 dbt 快照的更多详细信息,请参见此处

  1. 执行 dbt seed 命令。这个命令将在我们的数据库 imdb_dbt 中创建一个新表 genre_codes(按照我们的模式配置定义)并使用来自 CSV 文件的行填充它。

  2. 确认这些数据已被加载:

限制

当前的 ClickHouse dbt 插件有几个用户应注意的限制:

  1. 该插件当前通过 INSERT TO SELECT 的方式将模型物化为表。这实际上意味着数据重复。非常大的数据集(PB)可能导致极长的运行时间,使某些模型不可行。尽量减少任何查询返回的行数,尽可能使用 GROUP BY。优先选择汇总数据的模型,而不是那些简单执行转换而保持源行数的模型。
  2. 要使用分布式表表示模型,用户必须手动在每个节点上创建基础复制表。然后,可以在这些表之上创建分布式表。插件不管理集群创建。
  3. 当 dbt 在数据库中创建关系(表/视图)时,它通常以 {{ database }}.{{ schema }}.{{ table/view id }} 的形式创建。ClickHouse 没有模式的概念。因此,插件使用 {{schema}}.{{ table/view id }},其中 schema 是 ClickHouse 数据库。

进一步信息

之前的指南仅触及 dbt 功能的表面。建议用户阅读优秀的 dbt 文档

插件的额外配置描述在 这里

Fivetran

dbt-clickhouse 连接器也可用于 Fivetran 转换,允许在 Fivetran 平台内直接使用 dbt 进行无缝集成和转换能力。