跳到主要内容
跳到主要内容

Spark 连接器

此连接器利用 ClickHouse 特有的优化,比如高级分区和谓词下推,来提高查询性能和数据处理能力。
该连接器基于 ClickHouse 的官方 JDBC 连接器,并管理自己的目录。

在 Spark 3.0 之前,Spark 缺乏内置的目录概念,因此用户通常依赖于 Hive Metastore 或 AWS Glue 等外部目录系统。
使用这些外部解决方案,用户必须在访问 Spark 中的数据源表之前手动注册它们。
然而,由于 Spark 3.0 引入了目录概念,Spark 现在可以通过注册目录插件自动发现表。

Spark 的默认目录是 spark_catalog,表通过 {catalog name}.{database}.{table} 进行识别。
借助新的目录功能,现在可以在单个 Spark 应用程序中添加和使用多个目录。

要求

  • Java 8 或 17
  • Scala 2.12 或 2.13
  • Apache Spark 3.3、3.4 或 3.5

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
mainSpark 3.3、3.4、3.50.6.3
0.8.1Spark 3.3、3.4、3.50.6.3
0.8.0Spark 3.3、3.4、3.50.6.3
0.7.3Spark 3.3、3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2、3.30.3.2-patch11
0.4.0Spark 3.2、3.3不依赖于
0.3.0Spark 3.2、3.3不依赖于
0.2.1Spark 3.2不依赖于
0.1.2Spark 3.2不依赖于

安装与设置

为了将 ClickHouse 与 Spark 集成,有多种安装选项以适应不同的项目设置。
您可以将 ClickHouse Spark 连接器作为依赖项直接添加到项目的构建文件中(例如在 Maven 的 pom.xml 或 SBT 的 build.sbt 中)。
另外,您可以将所需的 JAR 文件放入 $SPARK_HOME/jars/ 文件夹中,或在 spark-submit 命令中通过 --jars 标志直接传递它们。
这两种方法都确保 ClickHouse 连接器在您的 Spark 环境中可用。

作为依赖项导入

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

如果要使用 SNAPSHOT 版本,请添加以下仓库。

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

下载库

二进制 JAR 的名称模式是:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

您可以在 Maven Central Repository 找到所有可用的已发布 JAR 文件,所有每日构建的 SNAPSHOT JAR 文件可在 Sonatype OSS Snapshots Repository 找到。

信息

务必包含带有 "all" 分类的 clickhouse-jdbc JAR,因为连接器依赖于 clickhouse-httpclickhouse-client — 这两者都包含在 clickhouse-jdbc:all 中。
或者,如果您不想使用完整的 JDBC 包,可以单独添加 clickhouse-client JARclickhouse-http

无论如何,请确保根据 兼容性矩阵 验证包版本的兼容性。

注册目录(必需)

为了访问您的 ClickHouse 表,您必须用以下配置来配置一个新的 Spark 目录:

属性默认值必需
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/A
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhost
spark.sql.catalog.<catalog_name>.protocolhttphttp
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123
spark.sql.catalog.<catalog_name>.user<clickhouse_username>default
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空字符串)
spark.sql.catalog.<catalog_name>.database<database>default
spark.<catalog_name>.write.formatjsonarrow

这些设置可以通过以下几种方式设置:

  • 编辑/创建 spark-defaults.conf
  • 将配置传递给您的 spark-submit 命令(或传递给您的 spark-shell / spark-sql CLI 命令)。
  • 在初始化您的上下文时添加配置。
信息

在使用 ClickHouse 集群时,您需要为每个实例设置一个唯一的目录名称。
例如:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

这样,您就可以通过 clickhouse1.<ck_db>.<ck_table> 从 Spark SQL 访问 clickhouse1 表 <ck_db>.<ck_table>,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 表 <ck_db>.<ck_table>

ClickHouse Cloud 设置

在连接到 ClickHouse Cloud 时,请确保启用 SSL 并设置适当的 SSL 模式。例如:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

读取数据

public static void main(String[] args) {
        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

写入数据

public static void main(String[] args) throws AnalysisException {

       // Create a Spark session
       SparkSession spark = SparkSession.builder()
               .appName("example")
               .master("local[*]")
               .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
               .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
               .config("spark.sql.catalog.clickhouse.protocol", "http")
               .config("spark.sql.catalog.clickhouse.http_port", "8123")
               .config("spark.sql.catalog.clickhouse.user", "default")
               .config("spark.sql.catalog.clickhouse.password", "123456")
               .config("spark.sql.catalog.clickhouse.database", "default")
               .config("spark.clickhouse.write.format", "json")
               .getOrCreate();

       // Define the schema for the DataFrame
       StructType schema = new StructType(new StructField[]{
               DataTypes.createStructField("id", DataTypes.IntegerType, false),
               DataTypes.createStructField("name", DataTypes.StringType, false),
       });

       List<Row> data = Arrays.asList(
               RowFactory.create(1, "Alice"),
               RowFactory.create(2, "Bob")
       );

       // Create a DataFrame
       Dataset<Row> df = spark.createDataFrame(data, schema);

       df.writeTo("clickhouse.default.example_table").append();

       spark.stop();
   }

DDL 操作

您可以使用 Spark SQL 在 ClickHouse 实例上执行 DDL 操作,所有更改将立即在 ClickHouse 中持久化。
Spark SQL 允许您以与 ClickHouse 完全相同的方式编写查询,因此您可以直接执行如 CREATE TABLE、TRUNCATE 等命令,而无需修改,例如:

备注

在使用 Spark SQL 时,仅可以一次执行一个语句。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

上述示例展示了 Spark SQL 查询,您可以在您的应用程序中使用任何 API——Java、Scala、PySpark 或 shell 运行。

配置

以下是连接器中可调节的配置:


默认描述版本
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse 支持使用复杂表达式作为分片键或分区值,例如 cityHash64(col_1, col_2),Spark 当前不支持。如果为 true,则忽略不受支持的表达式,否则在异常情况下快速失败。注意,当启用 spark.clickhouse.write.distributed.convertLocal 时,忽略不支持的分片键可能会损坏数据。0.4.0
spark.clickhouse.read.compression.codeclz4用于读取数据时解压缩的编解码器。支持的编解码器:none,lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue读取分布式表时,读取本地表而不是自身。如果为 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes0.1.0
spark.clickhouse.read.fixedStringAsbinary将 ClickHouse FixedString 类型读取为指定的 Spark 数据类型。支持的类型:binary,string0.8.0
spark.clickhouse.read.formatjson用于读取的序列化格式。支持的格式:json,binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse启用读取时的运行时过滤器。0.8.0
spark.clickhouse.read.splitByPartitionIdtrue如果为 true,则通过虚拟列 _partition_id 构建输入分区过滤器,而不是分区值。通过分区值组装 SQL 谓词存在已知问题。此功能需要 ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalse如果为 true,则在执行 CREATE/REPLACE TABLE ... AS SELECT ... 创建表时,将查询模式的所有字段标记为可为空。注意,此配置需要 SPARK-43390(在 Spark 3.5 中可用),不带此补丁,它始终被视为 true0.8.0
spark.clickhouse.write.batchSize10000写入 ClickHouse 时每批次的记录数。0.1.0
spark.clickhouse.write.compression.codeclz4用于写入时压缩数据的编解码器。支持的编解码器:none,lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse写入分布式表时,写入本地表而不是自身。如果为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue写入分布式表时向集群的所有节点写入。0.1.0
spark.clickhouse.write.formatarrow用于写入的序列化格式。支持的格式:json,arrow0.4.0
spark.clickhouse.write.localSortByKeytrue如果为 true,则在写入之前按排序键进行本地排序。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition 的值如果为 true,则在写入之前按分区进行本地排序。如果未设置,则等于 spark.clickhouse.write.repartitionByPartition0.3.0
spark.clickhouse.write.maxRetry3对于单批次写入因可重试代码失败而进行的最大重试次数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue是否按 ClickHouse 分区键对数据进行重新分区,以符合 ClickHouse 表的分布要求。0.3.0
spark.clickhouse.write.repartitionNum0在写入之前,必须重新分区以符合 ClickHouse 表的分布要求,使用此配置指定重新分区数量,值小于 1 表示没有要求。0.1.0
spark.clickhouse.write.repartitionStrictlyfalse如果为 true,则 Spark 将严格地将传入记录分配到分区,以满足在将记录传递给数据源表时的分布要求。否则,Spark 可能会应用某些优化以加快查询速度,但破坏分布要求。注意,此配置需要 SPARK-37523(在 Spark 3.4 中可用),不带此补丁,它始终被视为 true0.3.0
spark.clickhouse.write.retryInterval10s写入重试之间的秒数间隔。0.1.0
spark.clickhouse.write.retryableErrorCodes241ClickHouse 服务器在写入失败时返回的可重试错误代码。0.1.0

支持的数据类型

本节概述了 Spark 和 ClickHouse 之间数据类型的映射。
下表为从 ClickHouse 读取到 Spark 以及从 Spark 插入到 ClickHouse 时转换数据类型提供了快捷参考。

从 ClickHouse 读取数据到 Spark

ClickHouse 数据类型Spark 数据类型受支持是否原始类型备注
NothingNullType
BoolBooleanType
UInt8, Int16ShortType
Int8ByteType
UInt16,Int32IntegerType
UInt32,Int64, UInt64LongType
Int128,UInt128, Int256, UInt256DecimalType(38, 0)
Float32FloatType
Float64DoubleType
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringType
FixedStringBinaryType, StringType受配置 READ_FIXED_STRING_AS 控制
DecimalDecimalType精度和小数位最高可达 Decimal128
Decimal32DecimalType(9, scale)
Decimal64DecimalType(18, scale)
Decimal128DecimalType(38, scale)
Date, Date32DateType
DateTime, DateTime32, DateTime64TimestampType
ArrayArrayType数组元素类型也会被转换
MapMapType键仅限于 StringType
IntervalYearYearMonthIntervalType(Year)
IntervalMonthYearMonthIntervalType(Month)
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalType使用特定的区间类型
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

从 Spark 插入数据到 ClickHouse

Spark 数据类型ClickHouse 数据类型受支持是否原始类型备注
BooleanTypeUInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和小数位最高可达 Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType(列表、元组或数组)Array数组元素类型也会被转换
MapTypeMap键仅限于 StringType
Object
Nested

贡献与支持

如果您想为该项目做出贡献或报告任何问题,我们欢迎您的意见! 请访问我们的 GitHub 仓库 开启一个问题,建议改进或提交拉取请求。 欢迎贡献!在开始之前,请查看仓库中的贡献指南。 感谢您帮助改善我们的 ClickHouse Spark 连接器!