将 Amazon Kinesis 集成到 ClickHouse Cloud
前提条件
你已经熟悉了 ClickPipes 介绍,并且已配置好 IAM 凭证 或 IAM 角色。请参阅 Kinesis 基于角色的访问指南,了解如何配置可与 ClickHouse Cloud 配合使用的角色。
创建你的第一个 ClickPipe
- 访问你的 ClickHouse Cloud 服务的 SQL Console。

- 在左侧菜单中选择
Data Sources按钮,然后点击 “Set up a ClickPipe”。

- 选择你的数据源。

- 填写表单,为你的 ClickPipe 提供名称、描述(可选)、IAM 角色或凭证以及其他连接详情。

- 选择 Kinesis Stream 和起始偏移量(offset)。界面会显示来自所选源(Kafka topic 等)的示例记录。你还可以为 Kinesis 流启用 Enhanced Fan-out,以提升 ClickPipe 的性能和稳定性(有关 Enhanced Fan-out 的更多信息可在此处找到)。

- 在下一步中,你可以选择将数据摄取到新的 ClickHouse 表中,或复用现有表。按照界面中的说明修改表名、schema 和 settings。你可以在顶部的示例表中实时预览你的更改。

- 你也可以使用提供的控件自定义高级设置。

- 或者,你也可以选择将数据摄取到现有的 ClickHouse 表中。在这种情况下,界面将允许你将源字段映射到所选目标表中的 ClickHouse 字段。

- 最后,你可以为内部 ClickPipes 用户配置权限。
Permissions: ClickPipes 会创建一个专用用户,用于向目标表写入数据。你可以为该内部用户选择角色,使用自定义角色或预定义角色之一:
Full access:对集群具有完全访问权限。如果你在目标表中使用 materialized view 或字典(Dictionary),这可能会很有用。Only destination table:仅对目标表拥有INSERT权限。

- 点击 “Complete Setup” 后,系统会注册你的 ClickPipe,你就可以在汇总表中看到它的条目。

- 你也可以随时删除 ClickPipe。

- 汇总表提供了控件,用于显示 ClickHouse 中来源或目标表的示例数据。

以及用于显示摄取作业摘要和其他操作的控件。

- **恭喜!**你已成功完成第一个 ClickPipe 的配置。如果这是一个流式 ClickPipe,它会持续运行,从远程数据源实时摄取数据。否则,它会在完成一次批量摄取后结束。
支持的数据格式
支持的格式如下:
受支持的数据类型
标准类型支持
当前 ClickPipes 支持以下 ClickHouse 数据类型:
- 基础数值类型 - [U]Int8/16/32/64、Float32/64 和 BFloat16
- 大整数类型 - [U]Int128/256
- Decimal 类型
- Boolean
- String
- FixedString
- Date、Date32
- DateTime、DateTime64(仅支持 UTC 时区)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- 所有 ClickHouse LowCardinality 类型
- 键和值使用上述任一类型(包括 Nullable)的 Map
- 元素使用上述任一类型(包括 Nullable,且仅支持一层嵌套)的 Tuple 和 Array
- SimpleAggregateFunction 类型(用于 AggregatingMergeTree 或 SummingMergeTree 目标表)
Variant 类型支持
您可以为源数据流中的任意 JSON 字段手动指定一个 Variant 类型(例如 Variant(String, Int64, DateTime))。由于 ClickPipes 确定应使用的正确 Variant 子类型的工作方式所限,在 Variant 定义中只能使用一种整数或一种 DateTime 类型——例如,不支持 Variant(Int64, UInt32)。
JSON 类型支持
始终为 JSON 对象的 JSON 字段可以映射到 JSON 目标列。需要手动将该目标列更改为所需的 JSON 类型,包括任何固定路径或跳过路径。
Kinesis 虚拟列
以下虚拟列可用于 Kinesis 流。在创建新的目标表时,可以通过 Add Column 按钮添加这些虚拟列。
| Name | Description | Recommended Data Type |
|---|---|---|
| _key | Kinesis Partition Key | String |
| _timestamp | Kinesis Approximate Arrival Timestamp (millisecond precision) | DateTime64(3) |
| _stream | Kinesis Stream Name | String |
| _sequence_number | Kinesis Sequence Number | String |
| _raw_message | Full Kinesis Message | String |
在仅需要完整 Kinesis JSON 记录的场景中(例如使用 ClickHouse JsonExtract* 函数来填充下游物化视图),可以使用 _raw_message 字段。对于此类 ClickPipe 管道,删除所有“非虚拟”列可能会提升 ClickPipes 服务的性能。
限制
- 不支持 DEFAULT。
- 默认情况下,在使用最小 (XS) 副本规格运行时,单条消息限制为 8MB(未压缩),使用更大副本规格时为 16MB(未压缩)。超出该限制的消息会被拒绝并返回错误。如果需要更大的消息大小,请联系支持团队。
性能
批处理
ClickPipes 会以批处理的方式向 ClickHouse 插入数据。这样可以避免在数据库中创建过多的分区片段,从而导致集群性能下降。
在满足以下任一条件时,会将当前批次插入:
- 批大小达到最大限制(每 1GB 副本内存对应 100,000 行或 32MB)
- 批次处于打开状态的时间达到上限(5 秒)
延迟
延迟(定义为 Kinesis 消息发送到流与该消息在 ClickHouse 中可用之间的时间)取决于多个因素(例如 Kinesis 自身的延迟、网络延迟、消息大小/格式)。上文中描述的批处理也会影响延迟。我们始终建议针对您的特定用例进行测试,以了解可以预期的延迟水平。
如果您有特定的低延迟需求,请联系我们。
Active Shards
我们强烈建议根据吞吐量需求限制同时处于活动状态的分片数量。对于 "On Demand" Kinesis 流,AWS 会根据吞吐量自动分配匹配数量的分片, 但对于 "Provisioned" 流,如果预置了过多分片,则会导致如下所述的延迟,并且还会增加成本,因为此类流的 Kinesis 定价是按 "per shard"(每个分片)计费的。
如果生产者应用持续向大量活动分片写入数据,而管道的扩容不足以高效处理这些分片,就可能导致延迟。基于 Kinesis 吞吐量限制, ClickPipes 会为每个副本分配特定数量的 "workers" 来读取分片数据。比如,在最小规格下,一个 ClickPipes 副本将拥有 4 个这样的工作线程。如果生产者同时向 超过 4 个分片写入,来自这些“额外”分片的数据将要等到有工作线程空闲时才会被处理。特别是,如果管道使用 "enhanced fanout",每个工作线程会在 5 分钟内订阅单个分片, 并且在此期间无法读取任何其他分片。这会导致延迟出现以 5 分钟为单位的“尖峰”。
扩展
用于 Kinesis 的 ClickPipes 被设计为既可以水平扩展也可以垂直扩展。默认情况下,我们会创建一个包含一个 consumer 的 consumer group。此配置可以在创建 ClickPipe 时进行设置,或者在之后通过 Settings -> Advanced Settings -> Scaling 进行修改。
ClickPipes 通过跨可用区的分布式架构提供高可用性。 这要求扩展到至少两个 consumers。
无论正在运行的 consumers 数量是多少,ClickPipes 在设计上都具备容错能力。 如果某个 consumer 或其底层基础设施发生故障, ClickPipe 会自动重启该 consumer 并继续处理消息。
身份验证
要访问 Amazon Kinesis 流,可以使用 IAM 凭证或 IAM 角色。有关如何设置 IAM 角色的更多详细信息,请参阅本指南,了解如何配置可与 ClickHouse Cloud 配合使用的角色。