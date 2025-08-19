Работа с JSON в ClickHouse

Этот руководств предоставляет общие методы работы с данными JSON, реплицированными из MongoDB в ClickHouse с помощью ClickPipes.

Предположим, мы создали коллекцию t1 в MongoDB для отслеживания заказов клиентов:

db.t1.insertOne({ "order_id": "ORD-001234", "customer_id": 98765, "status": "completed", "total_amount": 299.97, "order_date": new Date(), "shipping": { "method": "express", "city": "Seattle", "cost": 19.99 }, "items": [ { "category": "electronics", "price": 149.99 }, { "category": "accessories", "price": 24.99 } ] })

Коннектор MongoDB CDC реплицирует документы MongoDB в ClickHouse с использованием нативного типа данных JSON. Реплицированная таблица t1 в ClickHouse будет содержать следующую строку:

Row 1: ────── _id: "68a4df4b9fe6c73b541703b0" doc: {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97} _peerdb_synced_at: 2025-08-19 20:50:42.005000000 _peerdb_is_deleted: 0 _peerdb_version: 0

Реплицированные таблицы используют эту стандартную схему:

┌─name───────────────┬─type──────────┐ │ _id │ String │ │ doc │ JSON │ │ _peerdb_synced_at │ DateTime64(9) │ │ _peerdb_version │ Int64 │ │ _peerdb_is_deleted │ Int8 │ └────────────────────┴───────────────┘

_id : Первичный ключ из MongoDB

: Первичный ключ из MongoDB doc : Документ MongoDB, реплицированный как тип данных JSON

: Документ MongoDB, реплицированный как тип данных JSON _peerdb_synced_at : Записывает, когда строка была в последний раз синхронизирована

: Записывает, когда строка была в последний раз синхронизирована _peerdb_version : Отслеживает версию строки; увеличивается при обновлении или удалении строки

: Отслеживает версию строки; увеличивается при обновлении или удалении строки _peerdb_is_deleted : Отмечает, была ли строка удалена

ClickPipes отображает коллекции MongoDB в ClickHouse, используя семейство движков таблиц ReplacingMergeTree . С помощью этого движка обновления моделируются как вставки с более новой версией ( _peerdb_version ) документа для данного первичного ключа ( _id ), что позволяет эффективно обрабатывать обновления, замены и удаления как версионные вставки.

ReplacingMergeTree асинхронно очищает дубликаты в фоновом режиме. Чтобы гарантировать отсутствие дубликатов для одной и той же строки, используйте модификатор FINAL . Например:

SELECT * FROM t1 FINAL;

Удаления из MongoDB передаются как новые строки, помеченные как удаленные с помощью столбца _peerdb_is_deleted . Обычно вы хотите отфильтровать их в своих запросах:

SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;

Вы также можете создать политику на уровне строк, чтобы автоматически фильтровать удаленные строки, вместо того чтобы указывать фильтр в каждом запросе:

CREATE ROW POLICY policy_name ON t1 FOR SELECT USING _peerdb_is_deleted = 0;

Вы можете напрямую запрашивать поля JSON, используя точечный синтаксис:

SELECT doc.order_id, doc.shipping.method FROM t1;

┌-─doc.order_id─┬─doc.shipping.method─┐ │ ORD-001234 │ express │ └───────────────┴─────────────────────┘

При запросе полей вложенных объектов с использованием точечного синтаксиса убедитесь, что вы добавили оператор ^ :

SELECT doc.^shipping as shipping_info FROM t1;

┌─shipping_info──────────────────────────────────────┐ │ {"city":"Seattle","cost":19.99,"method":"express"} │ └────────────────────────────────────────────────────┘

В ClickHouse каждый столбец в JSON имеет динамический тип. Динамический тип позволяет ClickHouse хранить значения любого типа, не зная тип заранее. Вы можете проверить это с помощью функции toTypeName :

SELECT toTypeName(doc.customer_id) AS type FROM t1;

┌─type────┐ │ Dynamic │ └─────────┘

Чтобы изучить подлежащие типы данных для столбца, вы можете проверить с помощью функции dynamicType . Обратите внимание, что для одного и того же имени столбца в разных строках могут быть разные типы данных:

SELECT dynamicType(doc.customer_id) AS type FROM t1;

┌─type──┐ │ Int64 │ └───────┘

Регулярные функции работают с динамическим типом так же, как и с обычными столбцами:

Пример 1: Парсинг даты

SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;

┌─order_date──────────┐ │ 2025-08-19 20:32:11 │ └─────────────────────┘

Пример 2: Условная логика

SELECT multiIf( doc.total_amount < 100, 'less_than_100', doc.total_amount < 1000, 'less_than_1000', '1000+') AS spendings FROM t1;

┌─spendings──────┐ │ less_than_1000 │ └────────────────┘

Пример 3: Операции с массивами

SELECT length(doc.items) AS item_count FROM t1;

┌─item_count─┐ │ 2 │ └────────────┘

Агрегатные функции в ClickHouse не работают с динамическим типом напрямую. Например, если вы попробуете напрямую использовать функцию sum на динамическом типе, вы получите следующую ошибку:

SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1; -- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)

Чтобы использовать агрегатные функции, приведите поле к подходящему типу с помощью функции CAST или синтаксиса :: :

SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;

┌─shipping_cost─┐ │ 19.99 │ └───────────────┘

примечание Приведение типов с динамического типа к подлежащему типу данных (определяемому dynamicType ) очень эффективно, так как ClickHouse уже хранит значение в своем подлежащем типе внутренне.

Вы можете создать обычные представления поверх таблицы JSON, чтобы инкапсулировать логику преобразования/приведения/трансформации для запроса данных, аналогично реляционной таблице. Обычные представления легковесны, так как они хранят только сам запрос, а не подлежащие данные. Например:

CREATE VIEW v1 AS SELECT CAST(doc._id, 'String') AS object_id, CAST(doc.order_id, 'String') AS order_id, CAST(doc.customer_id, 'Int64') AS customer_id, CAST(doc.status, 'String') AS status, CAST(doc.total_amount, 'Decimal64(2)') AS total_amount, CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date, doc.^shipping AS shipping_info, doc.items AS items FROM t1 FINAL WHERE _peerdb_is_deleted = 0;

Это представление будет иметь следующую схему:

┌─name────────────┬─type───────────┐ │ object_id │ String │ │ order_id │ String │ │ customer_id │ Int64 │ │ status │ String │ │ total_amount │ Decimal(18, 2) │ │ order_date │ DateTime64(3) │ │ shipping_info │ JSON │ │ items │ Dynamic │ └─────────────────┴────────────────┘

Теперь вы можете запрашивать представление аналогично тому, как запрашиваете плоскую таблицу:

SELECT customer_id, sum(total_amount) FROM v1 WHERE shipping_info.city = 'Seattle' GROUP BY customer_id ORDER BY customer_id DESC LIMIT 10;

Вы можете создать Обновляемые материализованные представления, которые позволяют вам планировать выполнение запросов для дедупликации строк и хранения результатов в плоской целевой таблице. С каждой запланированной перезагрузкой целевая таблица заменяется последними результатами запроса.

Ключевым преимуществом этого метода является то, что запрос с использованием ключевого слова FINAL выполняется только один раз во время обновления, устраняя необходимость в последующих запросах к целевой таблице с использованием FINAL .

Недостатком является то, что данные в целевой таблице актуальны только на момент последнего обновления. Для многих случаев использования интервалы обновления от нескольких минут до нескольких часов обеспечивают хороший баланс между свежестью данных и производительностью запросов.

CREATE TABLE flattened_t1 ( `_id` String, `order_id` String, `customer_id` Int64, `status` String, `total_amount` Decimal(18, 2), `order_date` DateTime64(3), `shipping_info` JSON, `items` Dynamic ) ENGINE = ReplacingMergeTree() PRIMARY KEY _id ORDER BY _id; CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS SELECT CAST(doc._id, 'String') AS _id, CAST(doc.order_id, 'String') AS order_id, CAST(doc.customer_id, 'Int64') AS customer_id, CAST(doc.status, 'String') AS status, CAST(doc.total_amount, 'Decimal64(2)') AS total_amount, CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date, doc.^shipping AS shipping_info, doc.items AS items FROM t1 FINAL WHERE _peerdb_is_deleted = 0;

Теперь вы можете напрямую запрашивать таблицу flattened_t1 без модификатора FINAL :

SELECT customer_id, sum(total_amount) FROM flattened_t1 WHERE shipping_info.city = 'Seattle' GROUP BY customer_id ORDER BY customer_id DESC LIMIT 10;

Если вы хотите получить доступ к плоским столбцам в реальном времени, вы можете создать Инкрементные материализованные представления. Если ваша таблица имеет частые обновления, не рекомендуется использовать модификатор FINAL в вашем материализованном представлении, так как каждое обновление запускает слияние. Вместо этого вы можете дедуплицировать данные во время запроса, создавая обычное представление поверх материализованного представления.

CREATE TABLE flattened_t1 ( `_id` String, `order_id` String, `customer_id` Int64, `status` String, `total_amount` Decimal(18, 2), `order_date` DateTime64(3), `shipping_info` JSON, `items` Dynamic, `_peerdb_version` Int64, `_peerdb_synced_at` DateTime64(9), `_peerdb_is_deleted` Int8 ) ENGINE = ReplacingMergeTree() PRIMARY KEY _id ORDER BY _id; CREATE MATERIALIZED VIEW imv TO flattened_t1 AS SELECT CAST(doc._id, 'String') AS _id, CAST(doc.order_id, 'String') AS order_id, CAST(doc.customer_id, 'Int64') AS customer_id, CAST(doc.status, 'String') AS status, CAST(doc.total_amount, 'Decimal64(2)') AS total_amount, CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date, doc.^shipping AS shipping_info, doc.items, _peerdb_version, _peerdb_synced_at, _peerdb_is_deleted FROM t1; CREATE VIEW flattened_t1_final AS SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;

Теперь вы можете запрашивать представление flattened_t1_final следующим образом: