Перейти к основному содержимому
Перейти к основному содержимому

Интеграция Apache Beam и ClickHouse

Apache Beam — это открытая, унифицированная модель программирования, которая позволяет разработчикам определять и выполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в его способности поддерживать широкий спектр сценариев обработки данных, от ETL (Извлечение, Преобразование, Загрузка) операций до сложной обработки событий и аналитики в реальном времени. Эта интеграция использует официальный JDBC соединитель ClickHouse для основного слоя вставки.

Пакет интеграции

Пакет интеграции, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций многих популярных систем хранения данных и баз данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO находится в репозитории Apache Beam.

Установка пакета Apache Beam ClickHouse

Установка пакета

Добавьте следующую зависимость в вашу систему управления пакетами:

Рекомендуемая версия Beam

Соединитель ClickHouseIO рекомендуется использовать, начиная с версии Apache Beam 2.59.0. Более ранние версии могут не полностью поддерживать функциональность соединителя.

Артефакты можно найти в официальном репозитории maven.

Пример кода

Следующий пример считывает CSV файл с именем input.csv как PCollection, преобразует его в объект Row (используя заданную схему) и вставляет его в локальный экземпляр ClickHouse с помощью ClickHouseIO:

Поддерживаемые типы данных

ClickHouseApache BeamПоддерживаетсяПримечания
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes — это LogicalType, представляющий массив байтов фиксированной длины, расположенный по адресу org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

Параметры ClickHouseIO.Write

Вы можете настроить конфигурацию ClickHouseIO.Write с помощью следующих функций-сеттеров:

Функция-сеттер параметровТип аргументаЗначение по умолчаниюОписание
withMaxInsertBlockSize(long maxInsertBlockSize)1000000Максимальный размер блока строк для вставки.
withMaxRetries(int maxRetries)5Максимальное количество попыток для неудачных вставок.
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)Максимальная кумулятивная продолжительность ожидания для повторных попыток.
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)Начальная продолжительность ожидания перед первой попыткой.
withInsertDistributedSync(Boolean sync)trueЕсли true, синхронизирует операции вставки для распределенных таблиц.
withInsertQuorum(Long quorum)nullКоличество реплик, необходимых для подтверждения операции вставки.
withInsertDeduplicate(Boolean deduplicate)trueЕсли true, включена дедупликация для операций вставки.
withTableSchema(TableSchema schema)nullСхема целевой таблицы ClickHouse.

Ограничения

Пожалуйста, учтите следующие ограничения при использовании соединителя:

  • На данный момент поддерживается только операция Sink. Соединитель не поддерживает операцию Source.
  • ClickHouse выполняет дедупликацию при вставке в ReplicatedMergeTree или в Distributed таблицу, построенную на основе ReplicatedMergeTree. Без репликации вставка в обычный MergeTree может привести к дубликатам, если операция вставки завершилась неудачно, а затем успешно повторилась. Однако каждый блок вставляется атомарно, и размер блока может быть настроен с помощью ClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается за счет использования контрольных сумм вставленных блоков. Для получения дополнительной информации о дедупликации, пожалуйста, посетите Deduplication и Конфигурация дедупликации вставок.
  • Соединитель не выполняет никаких DDL команд; следовательно, целевая таблица должна существовать до вставки.