Перейти к основному содержанию
Перейти к основному содержанию

Коннектор Spark

Этот коннектор использует оптимизации, специфичные для ClickHouse, такие как продвинутое секционирование и проталкивание предикатов (predicate pushdown), чтобы повысить производительность запросов и эффективность обработки данных. Коннектор основан на официальном JDBC-коннекторе ClickHouse и управляет собственным каталогом.

До Spark 3.0 в Spark отсутствовало встроенное понятие каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователям приходилось вручную регистрировать таблицы источника данных перед доступом к ним в Spark. Однако, поскольку в Spark 3.0 была введена концепция каталога, Spark теперь может автоматически обнаруживать таблицы посредством регистрации плагинов каталогов.

Каталог по умолчанию в Spark — spark_catalog, а таблицы идентифицируются как {catalog name}.{database}.{table}. С новой функциональностью каталогов теперь можно добавлять и использовать несколько каталогов в рамках одного приложения Spark.

Требования

  • Java 8 или 17
  • Scala 2.12 или 2.13
  • Apache Spark 3.3, 3.4 или 3.5

Матрица совместимости

ВерсияСовместимые версии SparkВерсия ClickHouse JDBC
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Не зависит от версии JDBC-драйвера
0.3.0Spark 3.2, 3.3Не зависит от версии JDBC-драйвера
0.2.1Spark 3.2Не зависит от версии JDBC-драйвера
0.1.2Spark 3.2Не зависит от версии JDBC-драйвера

Установка и настройка

Для интеграции ClickHouse со Spark доступно несколько вариантов установки для различных конфигураций проектов. Вы можете добавить коннектор ClickHouse Spark как зависимость непосредственно в файл сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). Также вы можете поместить необходимые JAR-файлы в каталог $SPARK_HOME/jars/ или передать их напрямую как параметр Spark с помощью флага --jars в команде spark-submit. Оба подхода обеспечивают доступность коннектора ClickHouse в вашей среде Spark.

Импорт как зависимости

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Добавьте следующий репозиторий, если вы хотите использовать SNAPSHOT-версию.

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

Загрузка библиотеки

Шаблон имени бинарного JAR-файла:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

Все доступные релизные JAR-файлы можно найти в Maven Central Repository, а все JAR-файлы ежедневных сборок SNAPSHOT — в Sonatype OSS Snapshots Repository.

Справочные материалы

Крайне важно включить clickhouse-jdbc JAR с классификатором «all», так как коннектор полагается на clickhouse-http и clickhouse-client, оба из которых входят в состав clickhouse-jdbc:all. В качестве альтернативы вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если предпочитаете не использовать полный пакет JDBC.

В любом случае убедитесь, что версии пакетов совместимы в соответствии с матрицей совместимости.

Зарегистрируйте каталог (обязательно)

Чтобы получить доступ к таблицам ClickHouse, необходимо настроить новый каталог Spark со следующими параметрами:

PropertyValueDefault ValueRequired
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

Эти параметры можно задать одним из следующих способов:

  • Отредактируйте/создайте spark-defaults.conf.
  • Передайте конфигурацию в команду spark-submit (или в команды CLI spark-shell/spark-sql).
  • Добавьте конфигурацию при инициализации контекста.
Справочные материалы

При работе с кластером ClickHouse необходимо задать уникальное имя каталога для каждого экземпляра. Например:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

Таким образом, вы сможете обращаться к таблице <ck_db>.<ck_table> в clickhouse1 из Spark SQL под именем clickhouse1.<ck_db>.<ck_table>, а к таблице <ck_db>.<ck_table> в clickhouse2 — под именем clickhouse2.<ck_db>.<ck_table>.

Настройки ClickHouse Cloud

При подключении к ClickHouse Cloud убедитесь, что включён SSL и задан соответствующий режим SSL. Например:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

Чтение данных

public static void main(String[] args) {
        // Создание сеанса Spark
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

Запись данных

 public static void main(String[] args) throws AnalysisException {

        // Создать сессию Spark
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // Определить схему для DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // Создать DataFrame
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

Операции DDL

Вы можете выполнять операции DDL с экземпляром ClickHouse с помощью Spark SQL, при этом все изменения сразу сохраняются в ClickHouse. Spark SQL позволяет писать запросы так же, как в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие, без каких-либо изменений, например:

Примечание

При использовании Spark SQL за один раз может быть выполнена только одна инструкция.

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'ключ раздела',
  id          BIGINT    NOT NULL COMMENT 'ключ сортировки',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

Приведённые выше примеры показывают запросы Spark SQL, которые вы можете выполнять в приложении, используя любой из API-интерфейсов — Java, Scala, PySpark или shell.

Конфигурации

Ниже перечислены параметры, которые можно настроить в коннекторе:


КлючПо умолчаниюОписаниеНачиная с
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиционирования, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, неподдерживаемые выражения игнорируются, в противном случае выполнение немедленно завершается с исключением. Обратите внимание, что при включённой настройке spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может привести к повреждению данных.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для распаковки данных при чтении. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении таблицы Distributed использовать локальную таблицу вместо самой распределённой. Если true, игнорировать spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsдвоичныйСчитывать тип ClickHouse FixedString как заданный тип данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации для чтения. Поддерживаемые форматы: JSON, Binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключить динамический фильтр при чтении.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли установлено значение true, формировать входной фильтр партиций по виртуальному столбцу _partition_id, а не по значению партиции. Известны проблемы при построении SQL-предикатов по значению партиции. Для использования этой возможности требуется ClickHouse Server v21.6+.0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли true, помечать все поля схемы запроса как допускающие значение NULL при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, что эта настройка требует SPARK-43390 (доступно в Spark 3.5); без этого патча данная опция фактически всегда равна true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей в одном пакете при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек, используемый для сжатия данных при их записи. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в таблицу Distributed данные записываются в локальную таблицу, а не в саму распределённую таблицу. Если установлено значение true, параметр spark.clickhouse.write.distributed.useClusterNodes игнорируется.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueПри записи в распределённую таблицу записывать данные на все узлы кластера.0.1.0
spark.clickhouse.write.formatстрелкаФормат сериализации при записи. Поддерживаемые форматы: JSON, Arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли установлено значение true, выполнять локальную сортировку по ключам сортировки перед записью.0.3.0
spark.clickhouse.write.localSortByPartitionзначение параметра spark.clickhouse.write.repartitionByPartitionЕсли имеет значение true, выполняется локальная сортировка по разделу перед записью. Если не задано, используется значение spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное количество повторных попыток записи для одной пакетной операции, завершившейся сбоем с кодами ошибок, допускающими повторную попытку.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueОпределяет, нужно ли переразбивать данные по ключам партиционирования ClickHouse, чтобы они соответствовали распределению данных в таблице ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Перед записью требуется перераспределить данные в соответствии с распределением таблицы ClickHouse. Используйте этот параметр конфигурации для указания количества переразбиений; значение меньше 1 означает, что перераспределение не требуется.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли true, Spark будет строго распределять входящие записи по разделам, чтобы обеспечить требуемое распределение перед записью данных в таблицу источника. В противном случае Spark может применять определённые оптимизации для ускорения запроса, но при этом нарушить требуемое распределение. Обратите внимание, что для этой конфигурации необходим патч SPARK-37523 (доступен в Spark 3.4); без этого патча она всегда ведёт себя как true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между повторными попытками записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Коды ошибок, допускающих повторную попытку, возвращаемые сервером ClickHouse при сбое записи.0.1.0

Поддерживаемые типы данных

В этом разделе описано сопоставление типов данных между Spark и ClickHouse. Таблицы ниже служат быстрой справкой по преобразованию типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаетсяПримитивныйПримечания
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16,Int32IntegerTypeДа
UInt32,Int64, UInt64LongTypeДа
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаУправляется конфигурацией READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб вплоть до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элементов массива также преобразуется
MapMapTypeНетКлючи ограничены типом StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется соответствующий тип интервала
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаетсяПримитивный типПримечания
BooleanTypeUInt8Да
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб вплоть до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (list, tuple, or array)ArrayНетТип элементов массива также преобразуется
MapTypeMapНетКлючи ограничены типом StringType
Object
Nested

Участие и поддержка

Если вы хотите поучаствовать в развитии проекта или сообщить о проблеме, мы будем рады вашей помощи! Перейдите в наш репозиторий на GitHub, чтобы создать issue, предложить улучшения или отправить pull request. Мы приветствуем любые вклады в проект! Перед началом, пожалуйста, ознакомьтесь с руководством по участию (contributing) в репозитории. Спасибо, что помогаете улучшать наш коннектор ClickHouse Spark!