跳到主要内容
跳到主要内容

将 Amazon Kinesis 与 ClickHouse Cloud 集成

前提条件

您已熟悉 ClickPipes 介绍 并设置了 IAM 凭证IAM 角色。请按照 Kinesis 基于角色的访问指南 获取关于如何设置与 ClickHouse Cloud 兼容的角色的信息。

创建您的第一个 ClickPipe

  1. 访问您的 ClickHouse Cloud 服务的 SQL 控制台。
  1. 在左侧菜单中选择 Data Sources 按钮,然后点击“设置 ClickPipe”
  1. 选择您的数据源。
  1. 填写表单,为您的 ClickPipe 提供名称、描述(可选)、您的 IAM 角色或凭证及其他连接详细信息。
  1. 选择 Kinesis 流和起始偏移量。用户界面将显示所选源(Kafka 主题等)的示例文档。您还可以为 Kinesis 流启用增强的扇出,以提高 ClickPipe 的性能和稳定性(有关增强扇出的更多信息,请见 这里)。
  1. 在下一步中,您可以选择是否将数据摄取到新的 ClickHouse 表中或重用现有的表。按照屏幕上的说明修改表名、模式和设置。您可以在顶部的示例表中实时预览更改。

您还可以使用提供的控件自定义高级设置

  1. 或者,您可以决定将数据摄取到现有的 ClickHouse 表中。在这种情况下,用户界面将允许您将源字段映射到所选目标表中的 ClickHouse 字段。
  1. 最后,您可以为内部 ClickPipes 用户配置权限。

权限: ClickPipes 将为写入数据到目标表创建一个专用用户。您可以使用自定义角色或预定义角色选择此内部用户的角色:

  • Full access:对集群具有完全访问权限。如果您使用目标表的物化视图或字典,这可能会很有用。
  • Only destination table:仅对目标表具有 INSERT 权限。
  1. 点击“完成设置”,系统将注册您的 ClickPipe,您将能够在摘要表中查看它。

摘要表提供控件,以显示来自源或 ClickHouse 中的目标表的示例数据

以及控件来删除 ClickPipe 并显示摄取作业的摘要。

  1. 恭喜! 您已成功设置您的第一个 ClickPipe。如果这是一个流式 ClickPipe,它将持续运行,从您的远程数据源实时摄取数据。否则将摄取批处理并完成。

支持的数据格式

支持的格式包括:

支持的数据类型

标准类型支持

当前 ClickHouse 数据库支持以下 ClickPipes 数据类型:

  • 基本数值类型 - [U]Int8/16/32/64 和 Float32/64
  • 大整数类型 - [U]Int128/256
  • 十进制类型
  • 布尔类型
  • 字符串
  • 固定字符串
  • 日期、Date32
  • 日期时间、DateTime64(仅 UTC 时区)
  • Enum8/Enum16
  • UUID
  • IPv4
  • IPv6
  • 所有 ClickHouse 低基数类型
  • 使用上述任何类型(包括 Nullable)的键和值的 Map
  • 使用上述任何类型的元素(包括 Nullable,深度仅限一层)的 Tuple 和 Array

变体类型支持(实验性)

如果您的 Cloud 服务运行 ClickHouse 25.3 或更高版本,则变体类型支持是自动的。否则,您需要提交支持工单以在您的服务上启用它。

您可以手动为源数据流中的任何 JSON 字段指定变体类型(例如 Variant(String, Int64, DateTime))。由于 ClickPipes 确定使用正确的变体子类型的方式,因此变体定义中只能使用一种整数或日期时间类型 - 例如,Variant(Int64, UInt32) 不被支持。

JSON 类型支持(实验性)

如果您的 Cloud 服务运行 ClickHouse 25.3 或更高版本,则 JSON 类型支持是自动的。否则,您需要提交支持工单以在您的服务上启用它。

始终是 JSON 对象的 JSON 字段可以分配给 JSON 目标列。您需要手动将目标列更改为所需的 JSON 类型,包括任何固定或跳过的路径。

Kinesis 虚拟列

对于 Kinesis 流,支持以下虚拟列。在创建新目标表时,可以使用 Add Column 按钮添加虚拟列。

名称描述推荐数据类型
_keyKinesis 分区键字符串
_timestampKinesis 近似到达时间戳(毫秒精度)DateTime64(3)
_streamKinesis 流名称字符串
_sequence_numberKinesis 序列号字符串
_raw_message完整的 Kinesis 消息字符串

_raw_message 字段可用于仅需要完整 Kinesis JSON 记录的情况下(例如使用 ClickHouse JsonExtract* 函数填充下游物化视图)。对于此类管道,删除所有“非虚拟”列可以提高 ClickPipes 的性能。

限制

性能

批量

ClickPipes 以批量方式将数据插入 ClickHouse。这样可以避免在数据库中创建过多的部分,从而导致集群性能问题。

批量插入在满足以下条件之一时发生:

  • 批量大小达到最大值(100,000 行或 20MB)
  • 批量已打开的最长时间(5 秒)

延迟

延迟(定义为 Kinesis 消息发送到流中和消息在 ClickHouse 中可用之间的时间)将依赖于多个因素(即 Kinesis 延迟、网络延迟、消息大小/格式)。上述节中描述的 批量处理 也将影响延迟。我们总是建议测试您的特定用例,以了解您可以预期的延迟。

如果您有特定的低延迟需求,请 联系我们

扩展性

ClickPipes for Kinesis 旨在水平扩展。默认情况下,我们创建一个具有一个消费者的消费者组。 这可以通过 ClickPipe 详情视图中的扩展控件进行更改。

ClickPipes 提供高可用性和可用区分布式架构。 这需要将消费者数量扩展到至少两个。

无论运行的消费者数量如何,容错都是设计而成的。 如果一个消费者或其底层基础设施发生故障, ClickPipe 将自动重新启动该消费者并继续处理消息。

身份验证

要访问 Amazon Kinesis 流,您可以使用 IAM 凭证IAM 角色。有关如何设置 IAM 角色的更多详细信息,您可以 参考此指南,以获取与 ClickHouse Cloud 兼容的角色的设置信息。