Перейти к основному содержимому
Перейти к основному содержимому

Соединитель Spark

Этот соединитель использует оптимизации, специфичные для ClickHouse, такие как продвинутое разбиение и подавление предикатов, чтобы улучшить производительность запросов и обработку данных. Соединитель основан на официальном JDBC соединителе ClickHouse и управляет собственным каталогом.

До Spark 3.0 в Spark не было концепции встроенного каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователям приходилось вручную регистрировать таблицы источников данных перед тем, как получить к ним доступ в Spark. Тем не менее, с введением концепции каталога в Spark 3.0, Spark теперь может автоматически обнаруживать таблицы, регистрируя плагины каталогов.

Стандартный каталог Spark — это spark_catalog, а таблицы идентифицируются по формату {catalog name}.{database}.{table}. С новой функцией каталога теперь возможно добавлять и работать с несколькими каталогами в одном приложении Spark.

Требования

  • Java 8 или 17
  • Scala 2.12 или 2.13
  • Apache Spark 3.3 или 3.4 или 3.5

Матрица совместимости

ВерсияСовместимые версии SparkВерсия ClickHouse JDBC
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Не зависит от
0.3.0Spark 3.2, 3.3Не зависит от
0.2.1Spark 3.2Не зависит от
0.1.2Spark 3.2Не зависит от

Установка и настройка

Для интеграции ClickHouse с Spark существует несколько вариантов установки, подходящих для различных настроек проектов. Вы можете добавить соединитель ClickHouse Spark как зависимость напрямую в файл сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). В качестве альтернативы, вы можете поместить необходимые JAR-файлы в папку $SPARK_HOME/jars/ или передать их напрямую в качестве параметра Spark, используя флаг --jars в команде spark-submit. Оба подхода обеспечивают доступность соединителя ClickHouse в вашей среде Spark.

Импорт как зависимость

Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT.

Скачивание библиотеки

Шаблон имени бинарного JAR:

Вы можете найти все доступные выпущенные файлы JAR в Maven Central Repository и все ежедневные сборки JAR файлов SNAPSHOT в Sonatype OSS Snapshots Repository.

к сведению

Важно включить clickhouse-jdbc JAR с классификатором "all", так как соединитель зависит от clickhouse-http и clickhouse-client — оба из которых упакованы в clickhouse-jdbc:all. В качестве альтернативы, вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если вы предпочитаете не использовать полный JDBC пакет.

В любом случае, убедитесь, что версии пакетов совместимы в соответствии с Матрицей совместимости.

Регистрация каталога (обязательно)

Чтобы получить доступ к вашим таблицам ClickHouse, вам необходимо настроить новый каталог Spark со следующими параметрами:

СвойствоЗначениеЗначение по умолчаниюОбязательно
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AДа
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostНет
spark.sql.catalog.<catalog_name>.protocolhttphttpНет
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123Нет
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultНет
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(пустая строка)Нет
spark.sql.catalog.<catalog_name>.database<database>defaultНет
spark.<catalog_name>.write.formatjsonarrowНет

Эти настройки могут быть установлены через один из следующих методов:

  • Редактировать или создать spark-defaults.conf.
  • Передать конфигурацию в вашу команду spark-submit (или в ваши команды spark-shell/spark-sql CLI).
  • Добавить конфигурацию при инициализации вашего контекста.
к сведению

При работе с кластером ClickHouse необходимо установить уникальное имя каталога для каждого экземпляра. Например:

Таким образом, вы сможете получить доступ к таблице clickhouse1 <ck_db>.<ck_table> из Spark SQL по clickhouse1.<ck_db>.<ck_table>, а к таблице clickhouse2 <ck_db>.<ck_table> по clickhouse2.<ck_db>.<ck_table>.

Чтение данных

Запись данных

Операции DDL

Вы можете выполнять операции DDL на вашем экземпляре ClickHouse с помощью Spark SQL, все изменения будут немедленно сохранены в ClickHouse. Spark SQL позволяет вам писать запросы точно так, как вы делали бы это в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие - без модификаций, например:

Приведенные выше примеры демонстрируют запросы Spark SQL, которые вы можете запускать в вашем приложении, используя любой API — Java, Scala, PySpark или оболочку.

Конфигурации

Следующие настройки доступны в коннекторе:


КлючЗначение по умолчаниюОписаниеС версии
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиции, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, игнорировать неподдерживаемые выражения, иначе быстро завершить с исключением. Обратите внимание, что при включенном spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может повредить данные.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для декомпрессии данных при чтении. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении распределенной таблицы, читайте локальную таблицу вместо нее самой. Если true, игнорируйте spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsbinaryЧитайте тип FixedString ClickHouse как указанный тип данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации для чтения. Поддерживаемые форматы: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключить фильтр времени выполнения для чтения.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли true, создайте фильтр входной партиции по виртуальной колонке _partition_id, вместо значения партиции. Известны проблемы с составлением SQL-предикатов по значению партиции. Эта функция требует ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли true, отметьте все поля схемы запроса как nullable при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, что эта конфигурация требует SPARK-43390 (доступно в Spark 3.5), без этого патча, она всегда будет действовать как true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей на пакет при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек, используемый для сжатия данных при записи. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в распределенную таблицу, записывайте локальную таблицу вместо нее самой. Если true, игнорируйте spark.clickhouse.write.distributed.useClusterNodes.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueЗаписывать на все узлы кластера при записи в распределенную таблицу.0.1.0
spark.clickhouse.write.formatarrowФормат сериализации для записи. Поддерживаемые форматы: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли true, выполните локальную сортировку по ключам сортировки перед записью.0.3.0
spark.clickhouse.write.localSortByPartitionзначение spark.clickhouse.write.repartitionByPartitionЕсли true, выполните локальную сортировку по партиции перед записью. Если не задано, это равно spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное количество повторных попыток записи для одной пакетной записи, завершившейся с ошибками.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueНужно ли перераспределять данные по ключам партиции ClickHouse для соблюдения распределений таблицы ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Перераспределение данных для соответствия распределениям таблицы ClickHouse требуется перед записью. Используйте эту конфигурацию для указания количества перераспределения, значение меньше 1 означает отсутствие требования.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли true, Spark будет строго распределять входящие записи по партициям для соблюдения требуемого распределения перед передачей записей в таблицу источника данных при записи. В противном случае Spark может применить определенные оптимизации для ускорения запроса, но нарушить требования распределения. Обратите внимание, что эта конфигурация требует SPARK-37523(доступно в Spark 3.4), без этого патча она всегда будет действовать как true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между повторной попыткой записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Код ошибки, которые можно повторно попытаться, возвращаемые сервером ClickHouse при неудаче записи.0.1.0

Поддерживаемые типы данных

Этот раздел описывает соответствие типов данных между Spark и ClickHouse. Таблицы ниже предоставляют быстрые ссылки для преобразования типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаетсяЯвляется примитивнымЗаметки
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16, Int32IntegerTypeДа
UInt32, Int64, UInt64LongTypeДа
Int128, UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаКонтролируется конфигурацией READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элемента массива также преобразуется
MapMapTypeНетКлючи ограничены типом StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется конкретный тип интервала
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаетсяЯвляется примитивнымЗаметки
BooleanTypeUInt8Да
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (список, кортеж или массив)ArrayНетТип элемента массива также преобразуется
MapTypeMapНетКлючи ограничены типом StringType
Object
Nested

Участие и поддержка

Если вы хотите внести свой вклад в проект или сообщить о любых проблемах, мы приветствуем ваши идеи! Посетите наш репозиторий GitHub, чтобы открыть проблему, предложить улучшения или отправить пул-запрос. Ваши вклады приветствуются! Пожалуйста, ознакомьтесь с рекомендациями по внесению изменений в репозитории перед началом. Спасибо за помощь в улучшении нашего ClickHouse Spark коннектора!