跳到主要内容
跳到主要内容

使用 Kafka 表引擎

Not supported in ClickHouse Cloud
备注

Kafka 表引擎不支持 ClickHouse Cloud。请考虑 ClickPipesKafka Connect

Kafka 到 ClickHouse

要使用 Kafka 表引擎,您应该对 ClickHouse 物化视图 有基本了解。

概述

最开始,我们关注最常见的用例:使用 Kafka 表引擎将数据从 Kafka 插入到 ClickHouse。

Kafka 表引擎允许 ClickHouse 直接读取 Kafka 主题。虽然对于查看主题上的消息很有用,但该引擎设计上仅允许一次性检索, 即当向表发出查询时,它会从队列中消耗数据并增加消费者偏移量,然后再将结果返回给调用者。在不重置这些偏移量的情况下,实际上无法重新读取数据。

为了持久化从表引擎读取的数据,我们需要一种方法来捕获数据并将其插入到另一个表中。基于触发器的物化视图本质上提供了这种功能。物化视图会启动对表引擎的读取,接收文档批次。TO 子句决定数据的去向——通常是 Merge Tree 家族 的一个表。这个过程如下图所示:

步骤

1. 准备

如果您在目标主题上有数据,您可以根据以下信息适应您的数据集。或者,提供了一个示例 Github 数据集 这里。该数据集在下面的示例中使用,采用了简化的模式和部分行(特别是,我们限制为与 ClickHouse 仓库 相关的 Github 事件),以便简洁。对于与 这里 提供的完整数据集相比,这仍然足以支持大多数与数据集相关的查询 published with the dataset 工作。

2. 配置 ClickHouse

如果您连接到安全的 Kafka,此步骤是必需的。这些设置不能通过 SQL DDL 命令传递,必须在 ClickHouse config.xml 中配置。我们假设您连接到一个使用 SASL 进行安全保护的实例。这是在与 Confluent Cloud 交互时最简单的方法。

将上述代码片段放入您的 conf.d/ 目录下的新文件中,或将其合并到现有的配置文件中。有关可配置设置,请参见 这里

我们还将创建一个名为 KafkaEngine 的数据库以供本教程使用:

创建数据库后,您需要切换到该数据库中:

3. 创建目标表

准备您的目标表。在下面的示例中,我们使用减少的 GitHub 模式以便简洁。请注意,尽管我们使用的是 MergeTree 表引擎,但此示例可以轻松适应 MergeTree 家族 的任何成员。

4. 创建并填充主题

接下来,我们将创建一个主题。有几种工具可以使用。如果我们在本地机器或 Docker 容器中运行 Kafka,RPK 很好用。我们可以通过运行以下命令创建一个名为 github 的主题,分为 5 个分区:

如果我们在 Confluent Cloud 上运行 Kafka,可能更希望使用 Confluent CLI

现在我们需要使用 kcat 向此主题填充一些数据。如果我们在本地运行 Kafka,并且身份验证已禁用,可以运行类似于以下内容的命令:

或者,如果我们的 Kafka 集群使用 SASL 进行身份验证:

该数据集包含 200,000 行,因此应该在几秒钟内完成摄取。如果您想处理更大的数据集,请查看 ClickHouse/kafka-samples GitHub 存储库中的 大型数据集部分

5. 创建 Kafka 表引擎

下面的示例创建一个与 Merge Tree 表具有相同架构的表引擎。这并不是严格要求的,因为在目标表中可以有别名或临时列。然而,设置非常重要;请注意使用 JSONEachRow 作为从 Kafka 主题消费 JSON 的数据类型。githubclickhouse 的值分别表示主题和消费者组的名称。主题实际上可以是值的列表。

我们在下面讨论引擎设置和性能调优。在这一点上,对表 github_queue 的简单选择应该读取一些行。请注意,这将向前移动消费者偏移量,阻止这些行在没有 重置 的情况下再次读取。请注意限制和所需参数 stream_like_engine_allow_direct_select.

6. 创建物化视图

物化视图将连接之前创建的两个表,从 Kafka 表引擎读取数据并将其插入到目标 merge tree 表中。我们可以进行多种数据转换。我们将进行简单的读取和插入。使用 * 假设列名是相同的(区分大小写)。

在创建时,物化视图连接到 Kafka 引擎并开始读取:将行插入目标表中。这个过程将无限期地继续,后续插入到 Kafka 的消息将被消费。请随意重新运行插入脚本以向 Kafka 插入更多消息。

7. 确认行已经插入

确认目标表中存在数据:

您应该看到 200,000 行:

常见操作

停止和重新启动消息消费

要停止消息消费,您可以断开 Kafka 引擎表的连接:

这不会影响消费者组的偏移量。要重新启动消费并从上一个偏移量继续,请重新附加表。

添加 Kafka 元数据

在将元数据跟踪原始 Kafka 消息引入 ClickHouse 之后,这可能会很有用。例如,我们可能想知道我们消费了特定主题或分区的多少。为此,Kafka 表引擎暴露了几个 虚拟列。通过修改我们的模式和物化视图的选择语句,这些可以作为列在我们的目标表中持久保存。

首先,我们在添加列到目标表之前执行上述的停止操作。

下面我们添加信息列以识别源主题和行来源的分区。

接下来,我们需要确保虚拟列按照要求进行映射。 虚拟列以 _ 为前缀。 虚拟列的完整列表可以在 这里 找到。

要使用虚拟列更新我们的表,我们需要删除物化视图,重新附加 Kafka 引擎表,并重新创建物化视图。

新消费的行应该具有元数据。

结果如下:

actor_loginevent_typecreated_attopicpartition
IgorMinarCommitCommentEvent2011-02-12 02:22:00github0
queeupCommitCommentEvent2011-02-12 02:23:23github0
IgorMinarCommitCommentEvent2011-02-12 02:23:24github0
IgorMinarCommitCommentEvent2011-02-12 02:24:50github0
IgorMinarCommitCommentEvent2011-02-12 02:25:20github0
dapiCommitCommentEvent2011-02-12 06:18:36github0
sourcerebelsCommitCommentEvent2011-02-12 06:34:10github0
jamierumbelowCommitCommentEvent2011-02-12 12:21:40github0
jpnCommitCommentEvent2011-02-12 12:24:31github0
OxoniumCommitCommentEvent2011-02-12 12:31:28github0
修改 Kafka 引擎设置

我们建议删除 Kafka 引擎表并使用新设置重新创建它。在此过程中,物化视图不需要修改——一旦 Kafka 引擎表被重新创建,消息消费将恢复。

调试问题

例如身份验证问题等错误不会在 Kafka 引擎 DDL 的响应中报告。为诊断问题,建议使用主 ClickHouse 日志文件 clickhouse-server.err.log。可以通过配置启用底层 Kafka 客户端库 librdkafka 的进一步跟踪日志。

处理格式错误的消息

Kafka 经常被用作数据的“垃圾场”。这会导致主题包含混合的消息格式和不一致的字段名称。避免这种情况,利用 Kafka 的功能如 Kafka Streams 或 ksqlDB 以确保消息在插入 Kafka 之前是良好格式和一致的。如果这些选项不可行,ClickHouse 有一些功能可以帮助。

  • 将消息字段视为字符串。在需要时,可以在物化视图语句中使用函数进行清理和转换。这不应代表生产解决方案,但可能有助于一次性摄取。
  • 如果您从主题消费 JSON,使用 JSONEachRow 格式,请使用设置 input_format_skip_unknown_fields。在写入数据时,默认情况下,如果输入数据包含目标表中不存在的列,ClickHouse 会抛出异常。然而,如果启用此选项,这些多余的列将被忽略。再次强调,这不是生产级解决方案,可能会混淆他人。
  • 考虑设置 kafka_skip_broken_messages。这要求用户为格式错误的消息指定每个块的容忍级别——在 kafka_max_block_size 的上下文中考虑。如果超过了此容忍度(以绝对消息计量),通常的异常行为将恢复,其他消息将被跳过。
递送语义和重复问题

Kafka 表引擎具有至少一次的语义。在几种已知的罕见情况下,可能会出现重复。例如,消息可以从 Kafka 读取并成功插入到 ClickHouse。在新偏移量可以提交之前与 Kafka 的连接丢失。在这种情况下需要重试该块。该块可以使用分布式表或 ReplicatedMergeTree 作为目标表进行 去重。虽然这降低了重复行的可能性,但它依赖于相同的块。像 Kafka 重新平衡这样的事件可能会使这一假设失效,从而在罕见情况下造成重复。

基于法定人数的插入

对于在 ClickHouse 中需要更高递送保证的情况,您可能需要 基于法定人数的插入。这不能在物化视图或目标表上设置。然而,它可以为用户配置文件设置,例如:

ClickHouse 到 Kafka

尽管是较少的用例,ClickHouse 数据也可以持久化到 Kafka。例如,我们将手动将行插入到 Kafka 表引擎中。该数据将由同一 Kafka 引擎读取,其物化视图会将数据放入 Merge Tree 表中。最后,我们演示在插入到 Kafka 中应用物化视图以从现有源表读取表的过程。

步骤

我们的初始目标最好呈现如下:

我们假设您在 Kafka 到 ClickHouse 步骤下创建了表和视图,并且主题已被完全消费。

1. 直接插入行

首先,确认目标表的行数。

您应该有 200,000 行:

现在从 GitHub 目标表将行插入回 Kafka 表引擎 github_queue。请注意我们如何利用 JSONEachRow 格式,并将选择限制为 100。

重新计算 GitHub 中的行以确认增加了 100。正如上图所示,行已经通过 Kafka 表引擎插入 Kafka,然后由同一引擎重新读取并通过我们的物化视图插入到 GitHub 目标表中!

您应该看到额外的 100 行:

2. 使用物化视图

当将文档插入表时,我们可以使用物化视图将消息推送到 Kafka 引擎(和主题)。当行插入到 GitHub 表时,触发了物化视图,导致行被插入回 Kafka 引擎并进入新的主题。再次展示如下:

创建一个新的 Kafka 主题 github_out 或等效主题。确保 Kafka 表引擎 github_out_queue 指向该主题。

现在创建一个新的物化视图 github_out_mv 指向 GitHub 表,当触发时将行插入上述引擎。结果,向 GitHub 表的添加将被推送到我们新的 Kafka 主题中。

如果您插入到原始的 github 主题,该主题是在 Kafka 到 ClickHouse 的一部分创建的,则文档将神奇地出现在 "github_clickhouse" 主题中。请使用原生 Kafka 工具确认这一点。例如,下面,我们使用 kcat 向 github 主题插入 100 行,适用于托管在 Confluent Cloud 上的主题:

github_out 主题的读取应确认消息的递送。

虽然这是一个复杂的示例,但这展示了在与 Kafka 引擎结合使用时物化视图的强大。

集群与性能

与 ClickHouse 集群一起工作

通过 Kafka 消费者组,多个 ClickHouse 实例可能会从同一主题读取。每个消费者将被分配到主题分区的 1:1 映射。使用 Kafka 表引擎扩展 ClickHouse 消费时,请考虑集群中的消费者总数不能超过主题上的分区数。因此,请确保主题的分区在事先适当地配置。

多个 ClickHouse 实例可以使用相同的消费者组 ID 配置以读取同一个主题——在创建 Kafka 表引擎时指定。因此,每个实例将从一个或多个分区读取,并向其本地目标表插入段。这些目标表可以再次配置为使用 ReplicatedMergeTree 来处理数据的重复。这种方法允许 Kafka 的读取与 ClickHouse 集群进行扩展,前提是有足够的 Kafka 分区。

性能调优

在考虑提高 Kafka 引擎表吞吐量性能时,请考虑以下几点:

  • 性能将根据消息大小、格式和目标表类型而有所不同。在单个表引擎上,每秒 100,000 行应被视为可达到的。默认情况下,消息以块的形式读取,由参数 kafka_max_block_size 控制。默认设置为 max_insert_block_size,默认值为 1,048,576。除非消息极大,否则几乎总是应增加此值。500k 到 1M 之间的值并不罕见。测试并评估对吞吐量性能的影响。
  • 可以使用 kafka_num_consumers 增加表引擎的消费者数量。然而,默认情况下,插入将在单线程中线性化,除非 kafka_thread_per_consumer 的默认值 (1) 被改变。将其设置为 1 以确保并行执行冲刷。请注意,创建具有 N 个消费者(和 kafka_thread_per_consumer=1)的 Kafka 引擎表在逻辑上等同于创建 N 个 Kafka 引擎,每个引擎都有物化视图和 kafka_thread_per_consumer=0。
  • 增加消费者不是免费的操作。每个消费者维护自己的缓冲区和线程,增加了服务器的开销。注意消费者的开销,并首先在集群中线性扩展,如果可能的话。
  • 如果 Kafka 消息的吞吐量变化且延迟是可以接受的,请考虑增加 stream_flush_interval_ms 以确保更大的块被冲刷。
  • background_message_broker_schedule_pool_size 设置执行后台任务的线程数。这些线程用于 Kafka 流媒体。此设置在 ClickHouse 服务器启动时应用,并且不能在用户会话中更改,默认值为 16。如果您在日志中看到超时,则可能适合增加此值。
  • 在与 Kafka 进行通信时,使用了 librdkafka 库,这本身会创建线程。因此,大量的 Kafka 表或消费者可能会导致大量的上下文切换。要么在集群中分配此负载,要么仅在可能的情况下复制目标表,或者考虑使用表引擎从多个主题读取——支持值的列表。多个物化视图可以从同一个表中读取,每个过滤来自特定主题的数据。

所有设置更改应经过测试。我们建议监视 Kafka 消费者滞后,以确保您得到合理的扩展。

附加设置

除了上述讨论的设置外,以下可能会引起您的兴趣:

  • Kafka_max_wait_ms - 阅读 Kafka 消息之前的等待时间(以毫秒为单位),然后重试。设置在用户配置文件级别,默认值为 5000。

底层 librdkafka 中的 所有设置 也可以放置在 ClickHouse 配置文件中的 kafka 元素内——设置名称应为 XML 元素,带有用下划线替换的句点,例如:

这些是专家设置,建议您参考 Kafka 文档以获取深入解释。