Коннектор Spark
Этот коннектор использует оптимизации, специфичные для ClickHouse, такие как продвинутое секционирование и проталкивание предикатов (predicate pushdown), чтобы повысить производительность запросов и эффективность обработки данных. Коннектор основан на официальном JDBC-коннекторе ClickHouse и управляет собственным каталогом.
До Spark 3.0 в Spark отсутствовало встроенное понятие каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователям приходилось вручную регистрировать таблицы источника данных перед доступом к ним в Spark. Однако, поскольку в Spark 3.0 была введена концепция каталога, Spark теперь может автоматически обнаруживать таблицы посредством регистрации плагинов каталогов.
Каталог по умолчанию в Spark — spark_catalog, а таблицы идентифицируются как {catalog name}.{database}.{table}. С новой
функциональностью каталогов теперь можно добавлять и использовать несколько каталогов в рамках одного приложения Spark.
Требования
- Java 8 или 17
- Scala 2.12 или 2.13
- Apache Spark 3.3, 3.4 или 3.5
Матрица совместимости
| Версия | Совместимые версии Spark | Версия ClickHouse JDBC |
|---|---|---|
| main | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 |
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
| 0.4.0 | Spark 3.2, 3.3 | Не зависит от версии JDBC-драйвера |
| 0.3.0 | Spark 3.2, 3.3 | Не зависит от версии JDBC-драйвера |
| 0.2.1 | Spark 3.2 | Не зависит от версии JDBC-драйвера |
| 0.1.2 | Spark 3.2 | Не зависит от версии JDBC-драйвера |
Установка и настройка
Для интеграции ClickHouse со Spark доступно несколько вариантов установки для различных конфигураций проектов.
Вы можете добавить коннектор ClickHouse Spark как зависимость непосредственно в файл сборки вашего проекта (например, в pom.xml
для Maven или build.sbt для SBT).
Также вы можете поместить необходимые JAR-файлы в каталог $SPARK_HOME/jars/ или передать их напрямую как параметр Spark
с помощью флага --jars в команде spark-submit.
Оба подхода обеспечивают доступность коннектора ClickHouse в вашей среде Spark.
Импорт как зависимости
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
Добавьте следующий репозиторий, если вы хотите использовать SNAPSHOT-версию.
Добавьте следующий репозиторий, если вы хотите использовать SNAPSHOT-версию:
При работе с параметрами оболочки Spark (Spark SQL CLI, Spark Shell CLI и команда Spark Submit) зависимости могут быть зарегистрированы путём передачи необходимых JAR-файлов:
Если вы хотите избежать копирования JAR-файлов на клиентский узел Spark, вы можете использовать следующий вариант:
Примечание: для сценариев, использующих только SQL, для production-среды рекомендуется Apache Kyuubi.
Загрузка библиотеки
Шаблон имени бинарного JAR-файла:
Все доступные релизные JAR-файлы можно найти в Maven Central Repository, а все JAR-файлы ежедневных сборок SNAPSHOT — в Sonatype OSS Snapshots Repository.
Крайне важно включить clickhouse-jdbc JAR с классификатором «all», так как коннектор полагается на clickhouse-http и clickhouse-client, оба из которых входят в состав clickhouse-jdbc:all. В качестве альтернативы вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если предпочитаете не использовать полный пакет JDBC.
В любом случае убедитесь, что версии пакетов совместимы в соответствии с матрицей совместимости.
Зарегистрируйте каталог (обязательно)
Чтобы получить доступ к таблицам ClickHouse, необходимо настроить новый каталог Spark со следующими параметрами:
| Property | Value | Default Value | Required |
|---|---|---|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Yes |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | No |
spark.sql.catalog.<catalog_name>.protocol | http | http | No |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | No |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | No |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (empty string) | No |
spark.sql.catalog.<catalog_name>.database | <database> | default | No |
spark.<catalog_name>.write.format | json | arrow | No |
Эти параметры можно задать одним из следующих способов:
- Отредактируйте/создайте
spark-defaults.conf. - Передайте конфигурацию в команду
spark-submit(или в команды CLIspark-shell/spark-sql). - Добавьте конфигурацию при инициализации контекста.
При работе с кластером ClickHouse необходимо задать уникальное имя каталога для каждого экземпляра. Например:
Таким образом, вы сможете обращаться к таблице <ck_db>.<ck_table> в clickhouse1 из Spark SQL под именем clickhouse1.<ck_db>.<ck_table>, а к таблице <ck_db>.<ck_table> в clickhouse2 — под именем clickhouse2.<ck_db>.<ck_table>.
Настройки ClickHouse Cloud
При подключении к ClickHouse Cloud убедитесь, что включён SSL и задан соответствующий режим SSL. Например:
Чтение данных
- Java
- Scala
- Python
- Spark SQL
Запись данных
- Java
- Scala
- Python
При необходимости вы можете использовать любую другую комбинацию пакетов при условии, что она удовлетворяет матрице совместимости, приведённой выше.
packages = [ "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0", "com.clickhouse:clickhouse-client:0.7.0", "com.clickhouse:clickhouse-http-client:0.7.0", "org.apache.httpcomponents.client5:httpclient5:5.2.1"
]
spark = (SparkSession.builder .config("spark.jars.packages", ",".join(packages)) .getOrCreate())
spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1") spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http") spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123") spark.conf.set("spark.sql.catalog.clickhouse.user", "default") spark.conf.set("spark.sql.catalog.clickhouse.password", "123456") spark.conf.set("spark.sql.catalog.clickhouse.database", "default") spark.conf.set("spark.clickhouse.write.format", "json")
Создание DataFrame
data = [Row(id=11, name="John"), Row(id=12, name="Doe")] df = spark.createDataFrame(data)
Запись DataFrame в ClickHouse
df.writeTo("clickhouse.default.example_table").append()
Операции DDL
Вы можете выполнять операции DDL с экземпляром ClickHouse с помощью Spark SQL, при этом все изменения сразу сохраняются в ClickHouse. Spark SQL позволяет писать запросы так же, как в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие, без каких-либо изменений, например:
При использовании Spark SQL за один раз может быть выполнена только одна инструкция.
Приведённые выше примеры показывают запросы Spark SQL, которые вы можете выполнять в приложении, используя любой из API-интерфейсов — Java, Scala, PySpark или shell.
Конфигурации
Ниже перечислены параметры, которые можно настроить в коннекторе:
| Ключ | По умолчанию | Описание | Начиная с |
|---|---|---|---|
| spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиционирования, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, неподдерживаемые выражения игнорируются, в противном случае выполнение немедленно завершается с исключением. Обратите внимание, что при включённой настройке spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может привести к повреждению данных. | 0.4.0 |
| spark.clickhouse.read.compression.codec | lz4 | Кодек, используемый для распаковки данных при чтении. Поддерживаемые кодеки: none, lz4. | 0.5.0 |
| spark.clickhouse.read.distributed.convertLocal | true | При чтении таблицы Distributed использовать локальную таблицу вместо самой распределённой. Если true, игнорировать spark.clickhouse.read.distributed.useClusterNodes. | 0.1.0 |
| spark.clickhouse.read.fixedStringAs | двоичный | Считывать тип ClickHouse FixedString как заданный тип данных Spark. Поддерживаемые типы: binary, string | 0.8.0 |
| spark.clickhouse.read.format | json | Формат сериализации для чтения. Поддерживаемые форматы: JSON, Binary | 0.6.0 |
| spark.clickhouse.read.runtimeFilter.enabled | false | Включить динамический фильтр при чтении. | 0.8.0 |
| spark.clickhouse.read.splitByPartitionId | true | Если установлено значение true, формировать входной фильтр партиций по виртуальному столбцу _partition_id, а не по значению партиции. Известны проблемы при построении SQL-предикатов по значению партиции. Для использования этой возможности требуется ClickHouse Server v21.6+. | 0.4.0 |
| spark.clickhouse.useNullableQuerySchema | false | Если true, помечать все поля схемы запроса как допускающие значение NULL при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, что эта настройка требует SPARK-43390 (доступно в Spark 3.5); без этого патча данная опция фактически всегда равна true. | 0.8.0 |
| spark.clickhouse.write.batchSize | 10000 | Количество записей в одном пакете при записи в ClickHouse. | 0.1.0 |
| spark.clickhouse.write.compression.codec | lz4 | Кодек, используемый для сжатия данных при их записи. Поддерживаемые кодеки: none, lz4. | 0.3.0 |
| spark.clickhouse.write.distributed.convertLocal | false | При записи в таблицу Distributed данные записываются в локальную таблицу, а не в саму распределённую таблицу. Если установлено значение true, параметр spark.clickhouse.write.distributed.useClusterNodes игнорируется. | 0.1.0 |
| spark.clickhouse.write.distributed.useClusterNodes | true | При записи в распределённую таблицу записывать данные на все узлы кластера. | 0.1.0 |
| spark.clickhouse.write.format | стрелка | Формат сериализации при записи. Поддерживаемые форматы: JSON, Arrow | 0.4.0 |
| spark.clickhouse.write.localSortByKey | true | Если установлено значение true, выполнять локальную сортировку по ключам сортировки перед записью. | 0.3.0 |
| spark.clickhouse.write.localSortByPartition | значение параметра spark.clickhouse.write.repartitionByPartition | Если имеет значение true, выполняется локальная сортировка по разделу перед записью. Если не задано, используется значение spark.clickhouse.write.repartitionByPartition. | 0.3.0 |
| spark.clickhouse.write.maxRetry | 3 | Максимальное количество повторных попыток записи для одной пакетной операции, завершившейся сбоем с кодами ошибок, допускающими повторную попытку. | 0.1.0 |
| spark.clickhouse.write.repartitionByPartition | true | Определяет, нужно ли переразбивать данные по ключам партиционирования ClickHouse, чтобы они соответствовали распределению данных в таблице ClickHouse перед записью. | 0.3.0 |
| spark.clickhouse.write.repartitionNum | 0 | Перед записью требуется перераспределить данные в соответствии с распределением таблицы ClickHouse. Используйте этот параметр конфигурации для указания количества переразбиений; значение меньше 1 означает, что перераспределение не требуется. | 0.1.0 |
| spark.clickhouse.write.repartitionStrictly | false | Если true, Spark будет строго распределять входящие записи по разделам, чтобы обеспечить требуемое распределение перед записью данных в таблицу источника. В противном случае Spark может применять определённые оптимизации для ускорения запроса, но при этом нарушить требуемое распределение. Обратите внимание, что для этой конфигурации необходим патч SPARK-37523 (доступен в Spark 3.4); без этого патча она всегда ведёт себя как true. | 0.3.0 |
| spark.clickhouse.write.retryInterval | 10s | Интервал в секундах между повторными попытками записи. | 0.1.0 |
| spark.clickhouse.write.retryableErrorCodes | 241 | Коды ошибок, допускающих повторную попытку, возвращаемые сервером ClickHouse при сбое записи. | 0.1.0 |
Поддерживаемые типы данных
В этом разделе описано сопоставление типов данных между Spark и ClickHouse. Таблицы ниже служат быстрой справкой по преобразованию типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.
Чтение данных из ClickHouse в Spark
| Тип данных ClickHouse | Тип данных Spark | Поддерживается | Примитивный | Примечания |
|---|---|---|---|---|
Nothing | NullType | ✅ | Да | |
Bool | BooleanType | ✅ | Да | |
UInt8, Int16 | ShortType | ✅ | Да | |
Int8 | ByteType | ✅ | Да | |
UInt16,Int32 | IntegerType | ✅ | Да | |
UInt32,Int64, UInt64 | LongType | ✅ | Да | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | Да | |
Float32 | FloatType | ✅ | Да | |
Float64 | DoubleType | ✅ | Да | |
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | Да | |
FixedString | BinaryType, StringType | ✅ | Да | Управляется конфигурацией READ_FIXED_STRING_AS |
Decimal | DecimalType | ✅ | Да | Точность и масштаб вплоть до Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | Да | |
Decimal64 | DecimalType(18, scale) | ✅ | Да | |
Decimal128 | DecimalType(38, scale) | ✅ | Да | |
Date, Date32 | DateType | ✅ | Да | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | Да | |
Array | ArrayType | ✅ | Нет | Тип элементов массива также преобразуется |
Map | MapType | ✅ | Нет | Ключи ограничены типом StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | Да | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | Да | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | Нет | Используется соответствующий тип интервала |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
Вставка данных из Spark в ClickHouse
| Тип данных Spark | Тип данных ClickHouse | Поддерживается | Примитивный тип | Примечания |
|---|---|---|---|---|
BooleanType | UInt8 | ✅ | Да | |
ByteType | Int8 | ✅ | Да | |
ShortType | Int16 | ✅ | Да | |
IntegerType | Int32 | ✅ | Да | |
LongType | Int64 | ✅ | Да | |
FloatType | Float32 | ✅ | Да | |
DoubleType | Float64 | ✅ | Да | |
StringType | String | ✅ | Да | |
VarcharType | String | ✅ | Да | |
CharType | String | ✅ | Да | |
DecimalType | Decimal(p, s) | ✅ | Да | Точность и масштаб вплоть до Decimal128 |
DateType | Date | ✅ | Да | |
TimestampType | DateTime | ✅ | Да | |
ArrayType (list, tuple, or array) | Array | ✅ | Нет | Тип элементов массива также преобразуется |
MapType | Map | ✅ | Нет | Ключи ограничены типом StringType |
Object | ❌ | |||
Nested | ❌ |
Участие и поддержка
Если вы хотите поучаствовать в развитии проекта или сообщить о проблеме, мы будем рады вашей помощи! Перейдите в наш репозиторий на GitHub, чтобы создать issue, предложить улучшения или отправить pull request. Мы приветствуем любые вклады в проект! Перед началом, пожалуйста, ознакомьтесь с руководством по участию (contributing) в репозитории. Спасибо, что помогаете улучшать наш коннектор ClickHouse Spark!