Перейти к основному содержимому
Перейти к основному содержимому

Соединитель Spark

Этот соединитель использует специфические оптимизации ClickHouse, такие как продвинутое партиционирование и выталкивание предикатов, чтобы улучшить производительность запросов и обработку данных. Соединитель основан на официальном JDBC соединителе ClickHouse и управляет своим собственным каталогом.

До версии Spark 3.0 в Spark отсутствовала концепция встроенного каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователи должны были вручную регистрировать таблицы своих источников данных перед доступом к ним в Spark. Однако с введением концепции каталога в Spark 3.0 Spark теперь может автоматически обнаруживать таблицы, регистрируя плагины каталога.

Дефолтный каталог Spark — это spark_catalog, и таблицы идентифицируются по {catalog name}.{database}.{table}. С новой функцией каталога теперь возможно добавлять и работать с несколькими каталогами в одном приложении Spark.

Требования

  • Java 8 или 17
  • Scala 2.12 или 2.13
  • Apache Spark 3.3 или 3.4 или 3.5

Матрица совместимости

ВерсияСовместимые версии SparkВерсия ClickHouse JDBC
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Не зависит от
0.3.0Spark 3.2, 3.3Не зависит от
0.2.1Spark 3.2Не зависит от
0.1.2Spark 3.2Не зависит от

Установка и настройка

Для интеграции ClickHouse с Spark существует несколько вариантов установки, подходящих для различных настроек проектов. Вы можете добавить соединитель ClickHouse Spark в качестве зависимости непосредственно в файл сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). В качестве альтернативы вы можете положить необходимые JAR-файлы в папку $SPARK_HOME/jars/, или передать их напрямую в качестве опции Spark, используя флаг --jars в команде spark-submit. Оба подхода обеспечивают доступность соединителя ClickHouse в вашей среде Spark.

Импорт как зависимость

Добавьте следующий репозиторий, если хотите использовать версию SNAPSHOT.

Скачайте библиотеку

Шаблон имени бинарного JAR:

Вы можете найти все доступные выпущенные JAR-файлы в Maven Central Repository и все JAR-файлы ежедневной сборки SNAPSHOT в Sonatype OSS Snapshots Repository.

к сведению

Важно включить clickhouse-jdbc JAR с классификатором "all", так как соединитель зависит от clickhouse-http и clickhouse-client — оба из которых упакованы в clickhouse-jdbc:all. В качестве альтернативы вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если вы предпочитаете не использовать полный пакет JDBC.

В любом случае убедитесь, что версии пакетов совместимы согласно Матрице совместимости.

Зарегистрировать каталог (обязательно)

Чтобы получить доступ к вашим таблицам ClickHouse, вы должны настроить новый каталог Spark с помощью следующих конфигураций:

СвойствоЗначениеЗначение по умолчаниюОбязательно
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogН/ДДа
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostНет
spark.sql.catalog.<catalog_name>.protocolhttphttpНет
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123Нет
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultНет
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(пустая строка)Нет
spark.sql.catalog.<catalog_name>.database<database>defaultНет
spark.<catalog_name>.write.formatjsonarrowНет

Эти настройки могут быть установлены с помощью одного из следующих способов:

  • Редактировать/создать spark-defaults.conf.
  • Передать конфигурацию в вашу команду spark-submit (или в ваши команды spark-shell/spark-sql CLI).
  • Добавить конфигурацию при инициализации вашего контекста.
к сведению

При работе с кластером ClickHouse нужно установить уникальное имя каталога для каждого экземпляра. Например:

Таким образом, вы сможете получить доступ к таблице clickhouse1 <ck_db>.<ck_table> из Spark SQL с помощью clickhouse1.<ck_db>.<ck_table>, и получить доступ к таблице clickhouse2 <ck_db>.<ck_table> с помощью clickhouse2.<ck_db>.<ck_table>.

Настройки ClickHouse Cloud

При подключении к ClickHouse Cloud убедитесь, что SSL включен и установлен соответствующий режим SSL. Например:

Чтение данных

Запись данных

Операции DDL

Вы можете выполнять операции DDL на вашем экземпляре ClickHouse, используя Spark SQL, и все изменения сразу сохраняются в ClickHouse. Spark SQL позволяет вам писать запросы точно так же, как вы бы делали в ClickHouse, поэтому вы можете выполнять команды, такие как CREATE TABLE, TRUNCATE и другие - без модификаций, например:

Приведенные выше примеры демонстрируют запросы Spark SQL, которые вы можете выполнять в своем приложении с использованием любого API — Java, Scala, PySpark или оболочки.

Конфигурации

Следующие конфигурации настраиваемы и доступны в коннекторе:


КлючЗначение по умолчаниюОписаниеС версии
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиции, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, игнорировать неподдерживаемые выражения, в противном случае быстро завершить с исключением. Обратите внимание, что когда spark.clickhouse.write.distributed.convertLocal включен, игнорирование неподдерживаемых ключей шардирования может испортить данные.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для декомпрессии данных для чтения. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении распределенной таблицы, читать локальную таблицу вместо самой себя. Если true, игнорировать spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsbinaryЧитать тип ClickHouse FixedString как указанный тип данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации для чтения. Поддерживаемые форматы: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключить фильтр времени выполнения для чтения.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли true, конструкция фильтра входной партиции по виртуальной колонке _partition_id, вместо значения партиции. Известны проблемы с составлением SQL предикатов по значению партиции. Эта функция требует ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли true, пометить все поля схемы запроса как допускающие значение NULL при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, что эта конфигурация требует SPARK-43390 (доступно в Spark 3.5), без этой патча, она всегда ведет себя как true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей на пакет при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек, используемый для сжатия данных при записи. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в распределенную таблицу записывать локальную таблицу вместо самой себя. Если true, игнорировать spark.clickhouse.write.distributed.useClusterNodes.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueЗаписывать на все узлы кластера при записи в распределённую таблицу.0.1.0
spark.clickhouse.write.formatarrowФормат сериализации для записи. Поддерживаемые форматы: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли true, выполнять локальную сортировку по ключам сортировки перед записью.0.3.0
spark.clickhouse.write.localSortByPartitionзначение spark.clickhouse.write.repartitionByPartitionЕсли true, выполнять локальную сортировку по партиции перед записью. Если не задано, это эквивалентно spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное количество попыток записи, которые мы будем повторять для одной неудачной пакетной записи с кодами повторных попыток.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueНужно ли перераспределить данные по ключам партиции ClickHouse для соответствия распределениям таблицы ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Перераспределение данных для соответствия распределениям таблицы ClickHouse необходимо перед записью, используйте эту конфигурацию для указания числа перераспределений, значение меньше 1 означает отсутствие необходимости.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли true, Spark будет строго распределять входящие записи по партициям, чтобы удовлетворить необходимое распределение перед передачей записей в таблицу источника данных при записи. В противном случае Spark может применить некоторые оптимизации для ускорения запроса, но нарушает требование распределения. Обратите внимание, что эта конфигурация требует SPARK-37523 (доступно в Spark 3.4), без этого патча, она всегда ведет себя как true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между попытками записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Коды ошибок повторной попытки, возвращаемые сервером ClickHouse при неудачной записи.0.1.0

Поддерживаемые типы данных

Этот раздел описывает соответствие типов данных между Spark и ClickHouse. Таблицы ниже предоставляют быстрые ссылки для преобразования типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаемыйЯвляется примитивнымПримечания
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16,Int32IntegerTypeДа
UInt32,Int64, UInt64LongTypeДа
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаКонтролируется конфигурацией READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элемента массива также преобразуется
MapMapTypeНетКлючи ограничены StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется конкретный тип интервала
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаемыйЯвляется примитивнымПримечания
BooleanTypeUInt8Да
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (список, кортеж или массив)ArrayНетТип элемента массива также преобразуется
MapTypeMapНетКлючи ограничены StringType
Object
Nested

Участие и поддержка

Если вы хотите внести свой вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашим мнениям! Посетите наш репозиторий GitHub, чтобы создать проблему, предложить улучшения или отправить запрос на изменение. Ваши вклады приветствуются! Пожалуйста, ознакомьтесь с рекомендациями по участию в репозитории перед началом работы. Спасибо за помощь в улучшении нашего коннектора ClickHouse Spark!