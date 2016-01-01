Подключение Streamkap к ClickHouse

Streamkap — это платформа интеграции данных в реальном времени, специализирующаяся на потоковой CDC (фиксация изменений данных) и обработке потоков данных. Она построена на высокопроизводительном, масштабируемом стеке с использованием Apache Kafka, Apache Flink и Debezium и предлагается как полностью управляемый сервис в формате SaaS или в вариантах развертывания BYOC (Bring your own Cloud).

Streamkap позволяет в режиме потока передавать каждую операцию INSERT , UPDATE и DELETE из исходных баз данных, таких как PostgreSQL, MySQL, SQL Server, MongoDB и других, непосредственно в ClickHouse с задержкой в миллисекунды.

Это делает её идеальным решением для построения аналитических дашбордов в реальном времени, операционной аналитики и подачи данных в реальном времени в модели машинного обучения.

Потоковая CDC (фиксация изменений данных) в реальном времени: Streamkap считывает изменения напрямую из журналов вашей базы данных, гарантируя, что данные в ClickHouse являются репликой источника в режиме реального времени.

Упрощённая потоковая обработка: преобразуйте, обогащайте, маршрутизируйте, форматируйте данные и создавайте эмбеддинги в реальном времени до их записи в ClickHouse. Решение работает на базе Flink, но без присущей ему сложности.

Полностью управляемое и масштабируемое решение: Обеспечивает готовый к продакшну конвейер без обслуживания, устраняя необходимость управлять собственной инфраструктурой Kafka, Flink, Debezium или реестром схем. Платформа рассчитана на высокую пропускную способность и может линейно масштабироваться для обработки миллиардов событий.

Автоматическая эволюция схемы: Streamkap автоматически обнаруживает изменения схемы в исходной базе данных и распространяет их в ClickHouse. Может обрабатывать добавление новых столбцов или изменение типов столбцов без ручного вмешательства.

Оптимизировано для ClickHouse: Интеграция создана для эффективной работы с возможностями ClickHouse. По умолчанию используется движок ReplacingMergeTree, который прозрачно обрабатывает обновления и удаления из исходной системы.

Отказоустойчивая доставка: Платформа предоставляет гарантию доставки как минимум один раз, обеспечивая согласованность данных между источником и ClickHouse. Для операций upsert выполняется дедупликация на основе первичного ключа.

В этом руководстве даётся общий высокоуровневый обзор настройки конвейера Streamkap для загрузки данных в ClickHouse.

Учётная запись Streamkap.

Параметры подключения к вашему кластеру ClickHouse: Hostname, Port, Username и Password.

Исходная база данных (например, PostgreSQL, SQL Server), настроенная для поддержки CDC. Подробные руководства по настройке доступны в документации Streamkap.

Войдите в свою учетную запись Streamkap. В боковой панели перейдите в Connectors и выберите вкладку Sources. Нажмите + Add и выберите тип базы данных-источника (например, SQL Server RDS). Заполните параметры подключения, включая endpoint, порт, имя базы данных и учетные данные пользователя. Сохраните коннектор.

В разделе Connectors откройте вкладку Destinations. Нажмите + Add и выберите ClickHouse из списка. Укажите параметры подключения к вашему сервису ClickHouse: Hostname: Хост (имя узла) экземпляра ClickHouse (например, abc123.us-west-2.aws.clickhouse.cloud )

Хост (имя узла) экземпляра ClickHouse (например, ) Port: Защищённый HTTPS-порт, обычно 8443

Защищённый HTTPS-порт, обычно Username and Password: Учётные данные пользователя ClickHouse

Учётные данные пользователя ClickHouse Database: Имя целевой базы данных в ClickHouse Сохраните назначение.

Перейдите в раздел Pipelines в боковой панели и нажмите + Create. Выберите источник (Source) и приёмник (Destination), которые вы только что настроили. Выберите схемы и таблицы, которые вы хотите передавать в потоковом режиме. Задайте имя конвейеру и нажмите Save.

После создания конвейер станет активным. Streamkap сначала создаст снимок (snapshot) существующих данных, а затем начнёт передавать в потоковом режиме все новые изменения по мере их появления.

Подключитесь к своему кластеру ClickHouse и выполните запрос, чтобы убедиться, что данные поступают в целевую таблицу.

SELECT * FROM your_table_name LIMIT 10;

Интеграция Streamkap разработана для эффективной работы с данными CDC в ClickHouse.

По умолчанию Streamkap использует режим upsert-ингестии. При создании таблицы в ClickHouse применяется движок ReplacingMergeTree. Этот движок оптимален для обработки событий CDC:

Первичный ключ исходной таблицы используется как ключ ORDER BY в определении таблицы ReplacingMergeTree.

Обновления в источнике записываются как новые строки в ClickHouse. В процессе фонового слияния ReplacingMergeTree схлопывает эти строки, сохраняя только последнюю версию на основе ключа сортировки.

Удаления обрабатываются с помощью метаданных, записываемых в параметр ReplacingMergeTree is_deleted . Строки, удалённые в источнике, не удаляются немедленно, а помечаются как удалённые. При необходимости удалённые записи могут сохраняться в ClickHouse для аналитических целей



Streamkap добавляет несколько столбцов метаданных в каждую таблицу для управления состоянием данных:

Имя столбца Описание _STREAMKAP_SOURCE_TS_MS Метка времени (в миллисекундах) события в исходной базе данных. _STREAMKAP_TS_MS Метка времени (в миллисекундах), когда Streamkap обработал событие. __DELETED Булевый флаг ( true / false ), указывающий, была ли строка удалена в источнике. _STREAMKAP_OFFSET Значение смещения из внутренних логов Streamkap, полезное для упорядочивания и отладки.

Поскольку ReplacingMergeTree обрабатывает обновления и удаления в фоновом режиме, простой запрос SELECT * может отображать исторические или удалённые строки до завершения операции слияния. Чтобы получить наиболее актуальное состояние ваших данных, необходимо отфильтровать удалённые записи и выбрать только последнюю версию каждой строки.

Это можно сделать с помощью модификатора FINAL, который удобен, но может повлиять на производительность запроса:

-- Using FINAL to get the correct current state SELECT * FROM your_table_name FINAL WHERE __DELETED = 'false'; SELECT * FROM your_table_name FINAL LIMIT 10; SELECT * FROM your_table_name FINAL WHERE <filter by keys in ORDER BY clause>; SELECT count(*) FROM your_table_name FINAL;

Для повышения производительности при работе с большими таблицами, особенно если вам не нужно читать все столбцы и при разовых аналитических запросах, вы можете использовать функцию argMax, чтобы вручную выбрать последнюю запись для каждого первичного ключа:

SELECT key, argMax(col1, version) AS col1, argMax(col2, version) AS col2 FROM t WHERE <your predicates> GROUP BY key;

Для продакшн-сценариев и многократно выполняемых параллельных запросов конечных пользователей можно использовать Materialized Views, чтобы смоделировать данные так, чтобы они лучше соответствовали последующим паттернам доступа.