Интеграция Apache Beam и ClickHouse
Apache Beam — это открытая, унифицированная модель программирования, которая позволяет разработчикам определять и выполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в его способности поддерживать широкий спектр сценариев обработки данных, от ETL (Извлечение, Преобразование, Загрузка) операций до сложной обработки событий и аналитики в реальном времени. Эта интеграция использует официальный JDBC соединитель ClickHouse для основного слоя вставки.
Пакет интеграции
Пакет интеграции, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций многих популярных систем хранения данных и баз данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO
находится в репозитории Apache Beam.
Установка пакета Apache Beam ClickHouse
Установка пакета
Добавьте следующую зависимость в вашу систему управления пакетами:
Соединитель ClickHouseIO
рекомендуется использовать, начиная с версии Apache Beam 2.59.0
. Более ранние версии могут не полностью поддерживать функциональность соединителя.
Артефакты можно найти в официальном репозитории maven.
Пример кода
Следующий пример считывает CSV файл с именем input.csv
как PCollection
, преобразует его в объект Row (используя заданную схему) и вставляет его в локальный экземпляр ClickHouse с помощью ClickHouseIO
:
Поддерживаемые типы данных
ClickHouse | Apache Beam | Поддерживается | Примечания |
---|---|---|---|
TableSchema.TypeName.FLOAT32 | Schema.TypeName#FLOAT | ✅ | |
TableSchema.TypeName.FLOAT64 | Schema.TypeName#DOUBLE | ✅ | |
TableSchema.TypeName.INT8 | Schema.TypeName#BYTE | ✅ | |
TableSchema.TypeName.INT16 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.INT32 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.INT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.STRING | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.UINT8 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.UINT16 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.UINT32 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.UINT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.DATE | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.DATETIME | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.ARRAY | Schema.TypeName#ARRAY | ✅ | |
TableSchema.TypeName.ENUM8 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.ENUM16 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.BOOL | Schema.TypeName#BOOLEAN | ✅ | |
TableSchema.TypeName.TUPLE | Schema.TypeName#ROW | ✅ | |
TableSchema.TypeName.FIXEDSTRING | FixedBytes | ✅ | FixedBytes — это LogicalType , представляющий массив байтов фиксированной длины, расположенный по адресу org.apache.beam.sdk.schemas.logicaltypes |
Schema.TypeName#DECIMAL | ❌ | ||
Schema.TypeName#MAP | ❌ |
Параметры ClickHouseIO.Write
Вы можете настроить конфигурацию ClickHouseIO.Write
с помощью следующих функций-сеттеров:
Функция-сеттер параметров | Тип аргумента | Значение по умолчанию | Описание |
---|---|---|---|
withMaxInsertBlockSize | (long maxInsertBlockSize) | 1000000 | Максимальный размер блока строк для вставки. |
withMaxRetries | (int maxRetries) | 5 | Максимальное количество попыток для неудачных вставок. |
withMaxCumulativeBackoff | (Duration maxBackoff) | Duration.standardDays(1000) | Максимальная кумулятивная продолжительность ожидания для повторных попыток. |
withInitialBackoff | (Duration initialBackoff) | Duration.standardSeconds(5) | Начальная продолжительность ожидания перед первой попыткой. |
withInsertDistributedSync | (Boolean sync) | true | Если true, синхронизирует операции вставки для распределенных таблиц. |
withInsertQuorum | (Long quorum) | null | Количество реплик, необходимых для подтверждения операции вставки. |
withInsertDeduplicate | (Boolean deduplicate) | true | Если true, включена дедупликация для операций вставки. |
withTableSchema | (TableSchema schema) | null | Схема целевой таблицы ClickHouse. |
Ограничения
Пожалуйста, учтите следующие ограничения при использовании соединителя:
- На данный момент поддерживается только операция Sink. Соединитель не поддерживает операцию Source.
- ClickHouse выполняет дедупликацию при вставке в
ReplicatedMergeTree
или вDistributed
таблицу, построенную на основеReplicatedMergeTree
. Без репликации вставка в обычный MergeTree может привести к дубликатам, если операция вставки завершилась неудачно, а затем успешно повторилась. Однако каждый блок вставляется атомарно, и размер блока может быть настроен с помощьюClickHouseIO.Write.withMaxInsertBlockSize(long)
. Дедупликация достигается за счет использования контрольных сумм вставленных блоков. Для получения дополнительной информации о дедупликации, пожалуйста, посетите Deduplication и Конфигурация дедупликации вставок. - Соединитель не выполняет никаких DDL команд; следовательно, целевая таблица должна существовать до вставки.
Связанный контент
- Документация класса
ClickHouseIO
документация. - Репозиторий
Github
примеров clickhouse-beam-connector.