跳到主要内容
跳到主要内容

使用 Vector 与 Kafka 和 ClickHouse

使用 Vector 与 Kafka 和 ClickHouse

Vector 是一个与供应商无关的数据管道,能够从 Kafka 读取并将事件发送到 ClickHouse。

有关 Vector 与 ClickHouse 的 入门指南 重点介绍了日志使用案例以及从文件中读取事件。我们使用了 Github 示例数据集,该数据集的事件保存在 Kafka 主题中。

Vector 利用 sources 通过推送或拉取模型检索数据。与此同时,Sinks 则为事件提供目的地。因此,我们利用 Kafka 源和 ClickHouse 接收器。请注意,虽然 Kafka 被支持作为接收器,但 ClickHouse 源并不提供。因此,Vector 不适合希望从 ClickHouse 向 Kafka 传输数据的用户。

Vector 还支持数据的 transformation。这超出了本指南的范围。如需处理数据集上的此需求,用户可参考 Vector 文档。

请注意,当前实现的 ClickHouse 接收器利用 HTTP 接口。目前,ClickHouse 接收器不支持使用 JSON 架构。数据必须以纯 JSON 格式或字符串的形式发布到 Kafka。

许可证

Vector 根据 MPL-2.0 License 分发。

收集连接信息

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

  • HOST 和 PORT:通常情况下,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。

  • 数据库名称:默认情况下,有一个名为 default 的数据库,请使用您要连接的数据库名称。

  • 用户名和密码:默认情况下,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中找到。选择您要连接的服务并点击 Connect

选择 HTTPS,并且详细信息可以在一个示例 curl 命令中找到。

如果您使用自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

步骤

  1. 创建 Kafka github 主题并插入 Github 数据集

此数据集包含 200,000 行,集中于 ClickHouse/ClickHouse 存储库。

  1. 确保目标表已创建。以下我们使用默认数据库。
  1. 下载并安装 Vector。创建一个 kafka.toml 配置文件并修改您 Kafka 和 ClickHouse 实例的值。

关于此配置和 Vector 行为的一些重要注意事项:

  • 该示例已在 Confluent Cloud 上进行测试。因此,sasl.*ssl.enabled 安全选项可能不适合自管理的情况。
  • 对于配置参数 bootstrap_servers,不需要协议前缀,例如 pkc-2396y.us-east-1.aws.confluent.cloud:9092
  • 源参数 decoding.codec = "json" 确保消息作为单个 JSON 对象传递到 ClickHouse 接收器。如果将消息处理为字符串并使用默认的 bytes 值,消息的内容将附加到字段 message。在大多数情况下,这将需要在 ClickHouse 中进行处理,如 Vector 入门指南 所述。
  • Vector 为消息添加了一些字段。在本示例中,我们通过配置参数 skip_unknown_fields = true 在 ClickHouse 接收器中忽略这些字段。这忽略不属于目标表架构的字段。请随意调整您的架构,以确保这些元字段如 offset 被添加。
  • 注意接收器如何通过参数 inputs 引用事件源。
  • 请注意 ClickHouse 接收器的行为,如 此处所述。为了优化吞吐量,用户可能希望调整 buffer.max_eventsbatch.timeout_secsbatch.max_bytes 参数。根据 ClickHouse 的 建议,单个批次中的事件数量应考虑最低值 1000。对于一致的高吞吐量用例,用户可能会增加参数 buffer.max_events。更多可变的吞吐量可能需要更改参数 batch.timeout_secs
  • 参数 auto_offset_reset = "smallest" 强制 Kafka 源从主题的开始处开始,从而确保我们消费步骤 (1) 中发布的消息。用户可能需要不同的行为。有关更多细节,请参见 这里
  1. 启动 Vector

默认情况下,在插入 ClickHouse 之前需要进行 健康检查。这确保可以建立连接并读取架构。如遇问题,请在前面加上 VECTOR_LOG=debug 以获取更多日志信息,这可能会很有帮助。

  1. 确认数据的插入。
count
200000