跳到主要内容
跳到主要内容

Spark 连接器

此连接器利用 ClickHouse 特有的优化(例如高级分区和谓词下推),以提升查询性能和数据处理效率。 该连接器基于 ClickHouse 官方 JDBC 连接器,并管理其自身的 catalog。

在 Spark 3.0 之前,Spark 不具备内置的 catalog 概念,因此用户通常依赖 Hive Metastore 或 AWS Glue 等外部 catalog 系统。 使用这些外部方案时,用户必须在 Spark 中访问数据源表之前,先手动注册这些表。 然而,自从 Spark 3.0 引入 catalog 概念后,Spark 现在可以通过注册 catalog 插件自动发现表。

Spark 的默认 catalog 是 spark_catalog,表通过 {catalog name}.{database}.{table} 来标识。借助这一新的 catalog 功能,现在可以在单个 Spark 应用中添加并使用多个 catalog。

先决条件

  • Java 8 或 17(Spark 4.0 需要 Java 17 及以上版本)
  • Scala 2.12 或 2.13(Spark 4.0 仅支持 Scala 2.13)
  • Apache Spark 3.3、3.4、3.5 或 4.0

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
mainSpark 3.3, 3.4, 3.5, 4.00.9.4
0.9.0Spark 3.3, 3.4, 3.5, 4.00.9.4
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3无需依赖
0.3.0Spark 3.2, 3.3无需依赖
0.2.1Spark 3.2无需依赖
0.1.2Spark 3.2无需依赖

安装与设置

要将 ClickHouse 与 Spark 集成,有多种安装方式,可适配不同的项目配置。 你可以在项目的构建文件中(例如 Maven 的 pom.xml 或 SBT 的 build.sbt)直接添加 ClickHouse Spark connector 作为依赖。 或者,你也可以将所需的 JAR 文件放入 $SPARK_HOME/jars/ 目录,或在运行 spark-submit 命令时通过 --jars 参数将它们作为 Spark 选项直接传入。 这两种方式都能确保 ClickHouse connector 在你的 Spark 环境中可用。

作为依赖导入

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

如果你需要使用 SNAPSHOT 版本,请添加以下仓库:

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

下载库文件

二进制 JAR 的命名模式如下:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

你可以在 Maven Central Repository 中找到所有已发布的 JAR 文件, 并在 Sonatype OSS Snapshots Repository 中找到所有每日构建的 SNAPSHOT JAR 文件。

参考资料

务必包含带有 "all" 分类器的 clickhouse-jdbc JAR, 因为该连接器依赖于 clickhouse-httpclickhouse-client——这两个依赖都已打包在 clickhouse-jdbc:all 中。 或者,如果你不希望使用完整的 JDBC 包,也可以分别添加 clickhouse-client JARclickhouse-http

无论选择哪种方式,请确保这些包的版本彼此兼容,并符合 兼容性矩阵 中的要求。

注册 catalog(必需)

要访问 ClickHouse 表,需要使用以下配置创建一个新的 Spark catalog:

属性默认值是否必需
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

可以通过以下任一方式配置这些设置:

  • 编辑或创建 spark-defaults.conf
  • spark-submit 命令中传入配置(或在 spark-shell / spark-sql CLI 命令中传入)。
  • 在初始化上下文时添加配置。
参考资料

在使用 ClickHouse 集群时,需要为每个实例设置唯一的 catalog 名称。 例如:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

这样,你就可以在 Spark SQL 中通过 clickhouse1.<ck_db>.<ck_table> 访问 clickhouse1 的 <ck_db>.<ck_table> 表,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 的 <ck_db>.<ck_table> 表。

ClickHouse Cloud 配置

连接到 ClickHouse Cloud 时,请务必启用 SSL,并设置相应的 SSL 模式。例如:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

读取数据

public static void main(String[] args) {
        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

写入数据

public static void main(String[] args) throws AnalysisException {

    // Create a Spark session
    SparkSession spark = SparkSession.builder()
            .appName("example")
            .master("local[*]")
            .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
            .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
            .config("spark.sql.catalog.clickhouse.protocol", "http")
            .config("spark.sql.catalog.clickhouse.http_port", "8123")
            .config("spark.sql.catalog.clickhouse.user", "default")
            .config("spark.sql.catalog.clickhouse.password", "123456")
            .config("spark.sql.catalog.clickhouse.database", "default")
            .config("spark.clickhouse.write.format", "json")
            .getOrCreate();

    // Define the schema for the DataFrame
    StructType schema = new StructType(new StructField[]{
            DataTypes.createStructField("id", DataTypes.IntegerType, false),
            DataTypes.createStructField("name", DataTypes.StringType, false),
    });

    List<Row> data = Arrays.asList(
            RowFactory.create(1, "Alice"),
            RowFactory.create(2, "Bob")
    );

    // Create a DataFrame
    Dataset<Row> df = spark.createDataFrame(data, schema);

    df.writeTo("clickhouse.default.example_table").append();

    spark.stop();
}

DDL 操作

可以使用 Spark SQL 对 ClickHouse 实例执行 DDL 操作,所有更改都会立即持久化到 ClickHouse 中。 Spark SQL 允许你以与在 ClickHouse 中相同的方式编写查询, 因此可以直接执行诸如 CREATE TABLE、TRUNCATE 等命令——无需修改,例如:

注意

在使用 Spark SQL 时,每次只能执行一条语句。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

上述示例展示了 Spark SQL 查询,您可以在应用程序中通过任意 API(Java、Scala、PySpark 或 shell)运行这些查询。

配置

以下是连接器中可配置的参数:


默认描述自从
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse 支持将复杂表达式用作分片键或分区值,例如 cityHash64(col_1, col_2),但这些目前在 Spark 中不受支持。若为 true,则忽略这些不受支持的表达式,否则将立即失败并抛出异常。注意,当启用 spark.clickhouse.write.distributed.convertLocal 时,忽略不受支持的分片键可能会导致数据损坏。0.4.0
spark.clickhouse.read.compression.codeclz4用于在读取时解压数据的编解码器。支持的编码格式:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue在读取 Distributed 表时,改为读取对应的本地表而不是 Distributed 表本身。若为 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes0.1.0
spark.clickhouse.read.fixedStringAs二进制按指定的 Spark 数据类型读取 ClickHouse 的 FixedString 类型。支持的类型:binary、string0.8.0
spark.clickhouse.read.formatJSON读取时使用的序列化格式。支持的格式:JSON、Binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse为读取启用运行时过滤。0.8.0
spark.clickhouse.read.splitByPartitionIdtrue如果为 true,则通过虚拟列 _partition_id 而不是分区值来构造输入分区过滤条件。通过分区值组装 SQL 谓词已知存在问题。此功能需要 ClickHouse Server v21.6 及以上版本。0.4.0
spark.clickhouse.useNullableQuerySchemafalse如果设置为 true,在创建表时执行 CREATE/REPLACE TABLE ... AS SELECT ... 会将查询 schema 中的所有字段都标记为可为空。注意,此配置依赖于 SPARK-43390(在 Spark 3.5 中可用),如果没有该补丁,其行为始终等同于 true0.8.0
spark.clickhouse.write.batchSize10000每批写入 ClickHouse 的记录数。0.1.0
spark.clickhouse.write.compression.codeclz4用于在写入数据时进行压缩的编解码器。支持的编解码器:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse在写入 Distributed 表时,改为写入本地表而不是 Distributed 表本身。若为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue在写入 Distributed 表时,将数据写入集群中的所有节点。0.1.0
spark.clickhouse.write.format箭头写入时使用的序列化格式。支持的格式:JSON、Arrow0.4.0
spark.clickhouse.write.localSortByKeytrue如果为 true,在写入前按排序键进行本地排序。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition 的值如果为 true,则在写入前按分区进行本地排序。若未设置,则等同于 spark.clickhouse.write.repartitionByPartition0.3.0
spark.clickhouse.write.maxRetry3对因可重试错误码而失败的单个批量写入操作允许的最大重试次数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue在写入之前,是否根据 ClickHouse 分区键对数据重新分区,以匹配 ClickHouse 表的分区分布。0.3.0
spark.clickhouse.write.repartitionNum0如果在写入前需要根据 ClickHouse 表的分布对数据进行重新分区,可使用此配置来指定重新分区的分区数;当该值小于 1 时表示对此无要求。0.1.0
spark.clickhouse.write.repartitionStrictlyfalse如果为 true,Spark 会在写入时将传入记录严格分布到各个分区,以满足所需的分布要求,然后再将记录传递给数据源表。否则,Spark 可能会应用某些优化以加速查询,但可能会破坏分布要求。注意,该配置依赖于补丁 SPARK-37523(在 Spark 3.4 中可用),在没有该补丁时,其行为始终等同于 true0.3.0
spark.clickhouse.write.retryInterval10s两次写入重试之间的时间间隔(以秒为单位)。0.1.0
spark.clickhouse.write.retryableErrorCodes241在写入失败时由 ClickHouse 服务器返回的可重试的错误码。0.1.0

支持的数据类型

本节概述了 Spark 与 ClickHouse 之间的数据类型映射。下表为从 ClickHouse 读取数据到 Spark,以及将 Spark 中的数据插入 ClickHouse 时的数据类型转换提供快速参考。

从 ClickHouse 读取数据到 Spark

ClickHouse 数据类型Spark 数据类型是否支持是否为原始类型说明
NothingNullType
BoolBooleanType
UInt8, Int16ShortType
Int8ByteType
UInt16,Int32IntegerType
UInt32,Int64, UInt64LongType
Int128,UInt128, Int256, UInt256DecimalType(38, 0)
Float32FloatType
Float64DoubleType
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringType
FixedStringBinaryType, StringType由配置 READ_FIXED_STRING_AS 控制
DecimalDecimalType精度和小数位数最高支持到 Decimal128
Decimal32DecimalType(9, scale)
Decimal64DecimalType(18, scale)
Decimal128DecimalType(38, scale)
Date, Date32DateType
DateTime, DateTime32, DateTime64TimestampType
ArrayArrayType数组元素类型也会被转换
MapMapType键类型仅支持 StringType
IntervalYearYearMonthIntervalType(Year)
IntervalMonthYearMonthIntervalType(Month)
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalType会映射为对应的具体区间类型
Object
Nested
TupleStructType同时支持具名和无名 tuple。具名 tuple 按名称映射到 struct 字段,无名 tuple 使用 _1_2 等字段名。支持嵌套 struct 和 Nullable 字段
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

从 Spark 向 ClickHouse 插入数据

Spark 数据类型ClickHouse 数据类型是否支持是否为基本类型说明
BooleanTypeBool自版本 0.9.0 起映射为 Bool 类型(而非 UInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和小数位数最高支持到 Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType (list, tuple, or array)Array数组元素类型也会被转换
MapTypeMap键类型仅支持 StringType
StructTypeTuple转换为带字段名的 Tuple
VariantTypeVariantType
Object
Nested

贡献与支持

如果您希望为该项目做出贡献或报告任何问题,我们非常欢迎您的反馈! 请访问我们的 GitHub 仓库 来提交 issue、提出改进建议或发起 pull request。 欢迎一切形式的贡献!在开始之前,请先查看仓库中的贡献指南。 感谢您帮助改进我们的 ClickHouse Spark 连接器!