本指南介绍在使用 ClickPipes 将 MongoDB 中的 JSON 数据复制到 ClickHouse 时的一些常见模式。
假设我们在 MongoDB 中创建了一个名为 t1 的集合用于跟踪客户订单:
db.t1.insertOne({
"order_id": "ORD-001234",
"customer_id": 98765,
"status": "completed",
"total_amount": 299.97,
"order_date": new Date(),
"shipping": {
"method": "express",
"city": "Seattle",
"cost": 19.99
},
"items": [
{
"category": "electronics",
"price": 149.99
},
{
"category": "accessories",
"price": 24.99
}
]
})
MongoDB CDC Connector 使用原生 JSON 数据类型将 MongoDB 文档复制到 ClickHouse。ClickHouse 中复制得到的表 t1 将包含如下这一行:
Row 1:
──────
_id: "68a4df4b9fe6c73b541703b0"
doc: {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at: 2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version: 0
表结构
这些复制表采用以下标准表结构:
┌─name───────────────┬─type──────────┐
│ _id │ String │
│ doc │ JSON │
│ _peerdb_synced_at │ DateTime64(9) │
│ _peerdb_version │ Int64 │
│ _peerdb_is_deleted │ Int8 │
└────────────────────┴───────────────┘
_id: 来自 MongoDB 的主键
doc: 以 JSON 数据类型形式复制的 MongoDB 文档
_peerdb_synced_at: 记录该行上次同步的时间
_peerdb_version: 跟踪该行的版本;在该行被更新或删除时递增
_peerdb_is_deleted: 标记该行是否被删除
ReplacingMergeTree 表引擎
ClickPipes 使用 ReplacingMergeTree 表引擎族将 MongoDB 集合映射到 ClickHouse。借助该引擎,更新会被建模为:针对给定主键 (_id) 的文档插入一条具有较新版本 (_peerdb_version) 的记录,从而能够高效地将更新、替换和删除操作处理为带版本的插入。
ReplacingMergeTree 会在后台异步清理重复数据。要确保同一行不存在重复,请使用 FINAL 修饰符。例如:
处理删除操作
来自 MongoDB 的删除操作会以新插入行的形式写入,这些行通过 _peerdb_is_deleted 列标记为已删除。通常应当在查询中过滤掉这些行:
SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;
你也可以创建行级策略来自动过滤掉已删除的行,而无需在每个查询中单独指定过滤条件:
CREATE ROW POLICY policy_name ON t1
FOR SELECT USING _peerdb_is_deleted = 0;
查询 JSON 数据
你可以使用点语法直接查询 JSON 字段:
SELECT
doc.order_id,
doc.shipping.method
FROM t1;
┌-─doc.order_id─┬─doc.shipping.method─┐
│ ORD-001234 │ express │
└───────────────┴─────────────────────┘
在使用点号语法查询 嵌套对象字段 时,请务必添加 ^ 运算符:
SELECT doc.^shipping as shipping_info FROM t1;
┌─shipping_info──────────────────────────────────────┐
│ {"city":"Seattle","cost":19.99,"method":"express"} │
└────────────────────────────────────────────────────┘
Dynamic 类型
在 ClickHouse 中,JSON 中的每个字段都是 Dynamic 类型。Dynamic 类型允许 ClickHouse 在无需预先知道其具体类型的情况下存储任意类型的值。你可以使用 toTypeName 函数来验证这一点:
SELECT toTypeName(doc.customer_id) AS type FROM t1;
┌─type────┐
│ Dynamic │
└─────────┘
要查看某个字段的实际数据类型,可以使用 dynamicType 函数。请注意,在不同的行中,同一字段名可能具有不同的数据类型:
SELECT dynamicType(doc.customer_id) AS type FROM t1;
┌─type──┐
│ Int64 │
└───────┘
常规函数 在处理动态类型时的工作方式与处理常规列时相同:
示例 1:日期解析
SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;
┌─order_date──────────┐
│ 2025-08-19 20:32:11 │
└─────────────────────┘
示例 2:条件逻辑
SELECT multiIf(
doc.total_amount < 100, 'less_than_100',
doc.total_amount < 1000, 'less_than_1000',
'1000+') AS spendings
FROM t1;
┌─spendings──────┐
│ less_than_1000 │
└────────────────┘
示例 3:数组操作
SELECT length(doc.items) AS item_count FROM t1;
┌─item_count─┐
│ 2 │
└────────────┘
字段类型转换
ClickHouse 中的聚合函数不能直接作用于 dynamic 类型。例如,如果你尝试直接对 dynamic 类型使用 sum 函数,会得到如下错误:
SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)
要使用聚合函数,请使用 CAST 函数或 :: 语法将字段转换为合适的类型:
SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;
┌─shipping_cost─┐
│ 19.99 │
└───────────────┘
注意
从 dynamic 类型转换为其底层数据类型 (由 dynamicType 决定) 开销很小,因为 ClickHouse 在内部已经使用该底层类型存储了该值。
展平 JSON
普通视图
你可以在 JSON 表之上创建普通视图,用于封装扁平化、类型转换和转换逻辑,从而以类似关系型表的方式查询数据。普通视图非常轻量,因为它们只存储查询本身,而不存储底层数据。例如:
CREATE VIEW v1 AS
SELECT
CAST(doc._id, 'String') AS object_id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
此视图将具有以下结构:
┌─name────────────┬─type───────────┐
│ object_id │ String │
│ order_id │ String │
│ customer_id │ Int64 │
│ status │ String │
│ total_amount │ Decimal(18, 2) │
│ order_date │ DateTime64(3) │
│ shipping_info │ JSON │
│ items │ Dynamic │
└─────────────────┴────────────────┘
现在你可以像查询一张已扁平化的表一样来查询该视图:
SELECT
customer_id,
sum(total_amount)
FROM v1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
可刷新materialized view
你可以创建可刷新materialized view,通过调度查询执行,对行进行去重,并将结果存储到扁平化的目标表中。每次按计划刷新时,目标表都会被替换为最新的查询结果。
这种方法的主要优势是,使用 FINAL 关键字的查询只在刷新过程中执行一次,从而不再需要在后续针对目标表的查询中使用 FINAL。
其缺点是,目标表中的数据只能保证更新到最近一次刷新的时间点。对于许多使用场景而言,从几分钟到几小时不等的刷新间隔,通常可以在数据新鲜度和查询性能之间取得良好平衡。
CREATE TABLE flattened_t1 (
`_id` String,
`order_id` String,
`customer_id` Int64,
`status` String,
`total_amount` Decimal(18, 2),
`order_date` DateTime64(3),
`shipping_info` JSON,
`items` Dynamic
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;
CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT
CAST(doc._id, 'String') AS _id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
现在可以直接查询 flattened_t1 表,而无需使用 FINAL 修饰符:
SELECT
customer_id,
sum(total_amount)
FROM flattened_t1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
增量materialized view
如果你希望实时访问已扁平化的列,可以创建增量materialized view。如果你的表经常有更新,不建议在materialized view 中使用 FINAL 修饰符,因为每次更新都会触发一次合并。相反,你可以在该 materialized view 之上构建一个普通视图,在查询时对数据进行去重。
CREATE TABLE flattened_t1 (
`_id` String,
`order_id` String,
`customer_id` Int64,
`status` String,
`total_amount` Decimal(18, 2),
`order_date` DateTime64(3),
`shipping_info` JSON,
`items` Dynamic,
`_peerdb_version` Int64,
`_peerdb_synced_at` DateTime64(9),
`_peerdb_is_deleted` Int8
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;
CREATE MATERIALIZED VIEW imv TO flattened_t1 AS
SELECT
CAST(doc._id, 'String') AS _id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items,
_peerdb_version,
_peerdb_synced_at,
_peerdb_is_deleted
FROM t1;
CREATE VIEW flattened_t1_final AS
SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;
现在可以按如下方式查询视图 flattened_t1_final:
SELECT
customer_id,
sum(total_amount)
FROM flattened_t1_final
AND shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;