Соединитель Spark
Этот соединитель использует оптимизации, специфичные для ClickHouse, такие как продвинутое разбиение и подавление предикатов, чтобы улучшить производительность запросов и обработку данных. Соединитель основан на официальном JDBC соединителе ClickHouse и управляет собственным каталогом.
До Spark 3.0 в Spark не было концепции встроенного каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователям приходилось вручную регистрировать таблицы источников данных перед тем, как получить к ним доступ в Spark. Тем не менее, с введением концепции каталога в Spark 3.0, Spark теперь может автоматически обнаруживать таблицы, регистрируя плагины каталогов.
Стандартный каталог Spark — это spark_catalog
, а таблицы идентифицируются по формату {catalog name}.{database}.{table}
. С новой
функцией каталога теперь возможно добавлять и работать с несколькими каталогами в одном приложении Spark.
Требования
- Java 8 или 17
- Scala 2.12 или 2.13
- Apache Spark 3.3 или 3.4 или 3.5
Матрица совместимости
Версия | Совместимые версии Spark | Версия ClickHouse JDBC |
---|---|---|
main | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
0.6.0 | Spark 3.3 | 0.3.2-patch11 |
0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
0.4.0 | Spark 3.2, 3.3 | Не зависит от |
0.3.0 | Spark 3.2, 3.3 | Не зависит от |
0.2.1 | Spark 3.2 | Не зависит от |
0.1.2 | Spark 3.2 | Не зависит от |
Установка и настройка
Для интеграции ClickHouse с Spark существует несколько вариантов установки, подходящих для различных настроек проектов.
Вы можете добавить соединитель ClickHouse Spark как зависимость напрямую в файл сборки вашего проекта (например, в pom.xml
для Maven или build.sbt
для SBT).
В качестве альтернативы, вы можете поместить необходимые JAR-файлы в папку $SPARK_HOME/jars/
или передать их напрямую в качестве параметра Spark, используя флаг --jars
в команде spark-submit
.
Оба подхода обеспечивают доступность соединителя ClickHouse в вашей среде Spark.
Импорт как зависимость
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT.
Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT:
При работе с опциями оболочки Spark (Spark SQL CLI, Spark Shell CLI и командой Spark Submit) зависимости могут быть зарегистрированы, передавая необходимые JAR:
Если вы хотите избежать копирования файлов JAR на узел клиента Spark, вы можете использовать следующее:
Примечание: Для случаев использования только SQL рекомендуется Apache Kyuubi для продакшна.
Скачивание библиотеки
Шаблон имени бинарного JAR:
Вы можете найти все доступные выпущенные файлы JAR в Maven Central Repository и все ежедневные сборки JAR файлов SNAPSHOT в Sonatype OSS Snapshots Repository.
Важно включить clickhouse-jdbc JAR с классификатором "all", так как соединитель зависит от clickhouse-http и clickhouse-client — оба из которых упакованы в clickhouse-jdbc:all. В качестве альтернативы, вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если вы предпочитаете не использовать полный JDBC пакет.
В любом случае, убедитесь, что версии пакетов совместимы в соответствии с Матрицей совместимости.
Регистрация каталога (обязательно)
Чтобы получить доступ к вашим таблицам ClickHouse, вам необходимо настроить новый каталог Spark со следующими параметрами:
Свойство | Значение | Значение по умолчанию | Обязательно |
---|---|---|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Да |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | Нет |
spark.sql.catalog.<catalog_name>.protocol | http | http | Нет |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | Нет |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | Нет |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (пустая строка) | Нет |
spark.sql.catalog.<catalog_name>.database | <database> | default | Нет |
spark.<catalog_name>.write.format | json | arrow | Нет |
Эти настройки могут быть установлены через один из следующих методов:
- Редактировать или создать
spark-defaults.conf
. - Передать конфигурацию в вашу команду
spark-submit
(или в ваши командыspark-shell
/spark-sql
CLI). - Добавить конфигурацию при инициализации вашего контекста.
При работе с кластером ClickHouse необходимо установить уникальное имя каталога для каждого экземпляра. Например:
Таким образом, вы сможете получить доступ к таблице clickhouse1 <ck_db>.<ck_table>
из Spark SQL по
clickhouse1.<ck_db>.<ck_table>
, а к таблице clickhouse2 <ck_db>.<ck_table>
по clickhouse2.<ck_db>.<ck_table>
.
Чтение данных
- Java
- Scala
- Python
- Spark SQL
Запись данных
- Java
- Scala
- Python
- Spark SQL
Операции DDL
Вы можете выполнять операции DDL на вашем экземпляре ClickHouse с помощью Spark SQL, все изменения будут немедленно сохранены в ClickHouse. Spark SQL позволяет вам писать запросы точно так, как вы делали бы это в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие - без модификаций, например:
Приведенные выше примеры демонстрируют запросы Spark SQL, которые вы можете запускать в вашем приложении, используя любой API — Java, Scala, PySpark или оболочку.
Конфигурации
Следующие настройки доступны в коннекторе:
Ключ | Значение по умолчанию | Описание | С версии |
---|---|---|---|
spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиции, например cityHash64(col_1, col_2) , которые в настоящее время не поддерживаются Spark. Если true , игнорировать неподдерживаемые выражения, иначе быстро завершить с исключением. Обратите внимание, что при включенном spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может повредить данные. | 0.4.0 |
spark.clickhouse.read.compression.codec | lz4 | Кодек, используемый для декомпрессии данных при чтении. Поддерживаемые кодеки: none, lz4. | 0.5.0 |
spark.clickhouse.read.distributed.convertLocal | true | При чтении распределенной таблицы, читайте локальную таблицу вместо нее самой. Если true , игнорируйте spark.clickhouse.read.distributed.useClusterNodes . | 0.1.0 |
spark.clickhouse.read.fixedStringAs | binary | Читайте тип FixedString ClickHouse как указанный тип данных Spark. Поддерживаемые типы: binary, string | 0.8.0 |
spark.clickhouse.read.format | json | Формат сериализации для чтения. Поддерживаемые форматы: json, binary | 0.6.0 |
spark.clickhouse.read.runtimeFilter.enabled | false | Включить фильтр времени выполнения для чтения. | 0.8.0 |
spark.clickhouse.read.splitByPartitionId | true | Если true , создайте фильтр входной партиции по виртуальной колонке _partition_id , вместо значения партиции. Известны проблемы с составлением SQL-предикатов по значению партиции. Эта функция требует ClickHouse Server v21.6+ | 0.4.0 |
spark.clickhouse.useNullableQuerySchema | false | Если true , отметьте все поля схемы запроса как nullable при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, что эта конфигурация требует SPARK-43390 (доступно в Spark 3.5), без этого патча, она всегда будет действовать как true . | 0.8.0 |
spark.clickhouse.write.batchSize | 10000 | Количество записей на пакет при записи в ClickHouse. | 0.1.0 |
spark.clickhouse.write.compression.codec | lz4 | Кодек, используемый для сжатия данных при записи. Поддерживаемые кодеки: none, lz4. | 0.3.0 |
spark.clickhouse.write.distributed.convertLocal | false | При записи в распределенную таблицу, записывайте локальную таблицу вместо нее самой. Если true , игнорируйте spark.clickhouse.write.distributed.useClusterNodes . | 0.1.0 |
spark.clickhouse.write.distributed.useClusterNodes | true | Записывать на все узлы кластера при записи в распределенную таблицу. | 0.1.0 |
spark.clickhouse.write.format | arrow | Формат сериализации для записи. Поддерживаемые форматы: json, arrow | 0.4.0 |
spark.clickhouse.write.localSortByKey | true | Если true , выполните локальную сортировку по ключам сортировки перед записью. | 0.3.0 |
spark.clickhouse.write.localSortByPartition | значение spark.clickhouse.write.repartitionByPartition | Если true , выполните локальную сортировку по партиции перед записью. Если не задано, это равно spark.clickhouse.write.repartitionByPartition . | 0.3.0 |
spark.clickhouse.write.maxRetry | 3 | Максимальное количество повторных попыток записи для одной пакетной записи, завершившейся с ошибками. | 0.1.0 |
spark.clickhouse.write.repartitionByPartition | true | Нужно ли перераспределять данные по ключам партиции ClickHouse для соблюдения распределений таблицы ClickHouse перед записью. | 0.3.0 |
spark.clickhouse.write.repartitionNum | 0 | Перераспределение данных для соответствия распределениям таблицы ClickHouse требуется перед записью. Используйте эту конфигурацию для указания количества перераспределения, значение меньше 1 означает отсутствие требования. | 0.1.0 |
spark.clickhouse.write.repartitionStrictly | false | Если true , Spark будет строго распределять входящие записи по партициям для соблюдения требуемого распределения перед передачей записей в таблицу источника данных при записи. В противном случае Spark может применить определенные оптимизации для ускорения запроса, но нарушить требования распределения. Обратите внимание, что эта конфигурация требует SPARK-37523(доступно в Spark 3.4), без этого патча она всегда будет действовать как true . | 0.3.0 |
spark.clickhouse.write.retryInterval | 10s | Интервал в секундах между повторной попыткой записи. | 0.1.0 |
spark.clickhouse.write.retryableErrorCodes | 241 | Код ошибки, которые можно повторно попытаться, возвращаемые сервером ClickHouse при неудаче записи. | 0.1.0 |
Поддерживаемые типы данных
Этот раздел описывает соответствие типов данных между Spark и ClickHouse. Таблицы ниже предоставляют быстрые ссылки для преобразования типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.
Чтение данных из ClickHouse в Spark
Тип данных ClickHouse | Тип данных Spark | Поддерживается | Является примитивным | Заметки |
---|---|---|---|---|
Nothing | NullType | ✅ | Да | |
Bool | BooleanType | ✅ | Да | |
UInt8 , Int16 | ShortType | ✅ | Да | |
Int8 | ByteType | ✅ | Да | |
UInt16 , Int32 | IntegerType | ✅ | Да | |
UInt32 , Int64 , UInt64 | LongType | ✅ | Да | |
Int128 , UInt128 , Int256 , UInt256 | DecimalType(38, 0) | ✅ | Да | |
Float32 | FloatType | ✅ | Да | |
Float64 | DoubleType | ✅ | Да | |
String , JSON , UUID , Enum8 , Enum16 , IPv4 , IPv6 | StringType | ✅ | Да | |
FixedString | BinaryType , StringType | ✅ | Да | Контролируется конфигурацией READ_FIXED_STRING_AS |
Decimal | DecimalType | ✅ | Да | Точность и масштаб до Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | Да | |
Decimal64 | DecimalType(18, scale) | ✅ | Да | |
Decimal128 | DecimalType(38, scale) | ✅ | Да | |
Date , Date32 | DateType | ✅ | Да | |
DateTime , DateTime32 , DateTime64 | TimestampType | ✅ | Да | |
Array | ArrayType | ✅ | Нет | Тип элемента массива также преобразуется |
Map | MapType | ✅ | Нет | Ключи ограничены типом StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | Да | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | Да | |
IntervalDay , IntervalHour , IntervalMinute , IntervalSecond | DayTimeIntervalType | ✅ | Нет | Используется конкретный тип интервала |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
Вставка данных из Spark в ClickHouse
Тип данных Spark | Тип данных ClickHouse | Поддерживается | Является примитивным | Заметки |
---|---|---|---|---|
BooleanType | UInt8 | ✅ | Да | |
ByteType | Int8 | ✅ | Да | |
ShortType | Int16 | ✅ | Да | |
IntegerType | Int32 | ✅ | Да | |
LongType | Int64 | ✅ | Да | |
FloatType | Float32 | ✅ | Да | |
DoubleType | Float64 | ✅ | Да | |
StringType | String | ✅ | Да | |
VarcharType | String | ✅ | Да | |
CharType | String | ✅ | Да | |
DecimalType | Decimal(p, s) | ✅ | Да | Точность и масштаб до Decimal128 |
DateType | Date | ✅ | Да | |
TimestampType | DateTime | ✅ | Да | |
ArrayType (список, кортеж или массив) | Array | ✅ | Нет | Тип элемента массива также преобразуется |
MapType | Map | ✅ | Нет | Ключи ограничены типом StringType |
Object | ❌ | |||
Nested | ❌ |
Участие и поддержка
Если вы хотите внести свой вклад в проект или сообщить о любых проблемах, мы приветствуем ваши идеи! Посетите наш репозиторий GitHub, чтобы открыть проблему, предложить улучшения или отправить пул-запрос. Ваши вклады приветствуются! Пожалуйста, ознакомьтесь с рекомендациями по внесению изменений в репозитории перед началом. Спасибо за помощь в улучшении нашего ClickHouse Spark коннектора!