跳到主要内容
跳到主要内容

Spark 连接器

此连接器利用 ClickHouse 特有的优化功能,例如高级分区和谓词下推,以提升查询性能和数据处理效率。 该连接器基于 ClickHouse 官方 JDBC 连接器,并管理其自己的 catalog。

在 Spark 3.0 之前,Spark 缺少内置的 catalog 概念,因此用户通常依赖外部 catalog 系统,例如 Hive Metastore 或 AWS Glue。 使用这些外部方案时,用户必须先手动注册数据源表,才能在 Spark 中访问它们。 然而,自从 Spark 3.0 引入 catalog 概念之后,Spark 现在可以通过注册 catalog 插件来自动发现表。

Spark 的默认 catalog 是 spark_catalog,表通过 {catalog name}.{database}.{table} 标识。借助新的 catalog 功能,现在可以在单个 Spark 应用程序中添加并使用多个 catalog。

先决条件

  • Java 8 或 17
  • Scala 2.12 或 2.13
  • Apache Spark 3.3 或 3.4 或 3.5

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3无依赖
0.3.0Spark 3.2, 3.3无依赖
0.2.1Spark 3.2无依赖
0.1.2Spark 3.2无依赖

安装与设置

要将 ClickHouse 集成到 Spark 中,有多种安装方式可适配不同的项目环境。 您可以在项目的构建文件中(例如 Maven 的 pom.xml 或 SBT 的 build.sbt)直接添加 ClickHouse Spark 连接器作为依赖。 或者,也可以将所需的 JAR 文件放到 $SPARK_HOME/jars/ 目录中,或者在使用 spark-submit 命令时,通过 --jars 参数作为 Spark 选项直接传入。 这两种方式都可以确保在您的 Spark 环境中提供 ClickHouse 连接器。

作为依赖导入

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

如果您想使用 SNAPSHOT 版本,请添加以下仓库。

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

下载依赖库

二进制 JAR 的命名规则为:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

你可以在 Maven Central Repository 中找到所有已发布的 JAR 文件, 并在 Sonatype OSS Snapshots Repository 中找到所有每日构建的 SNAPSHOT JAR 文件。

参考资料

务必包含带有 “all” classifier 的 clickhouse-jdbc JAR, 因为该 connector 依赖 clickhouse-httpclickhouse-client——这两者都已经打包 在 clickhouse-jdbc:all 中。 或者,如果不想使用完整的 JDBC 包,也可以分别添加 clickhouse-client JARclickhouse-http

无论采用哪种方式,请确保这些包的版本与 兼容性矩阵 中列出的版本保持兼容。

注册 Catalog(必需)

要访问 ClickHouse 表,必须使用以下配置创建一个新的 Spark Catalog:

PropertyValueDefault ValueRequired
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

可以通过以下任一方式设置这些配置:

  • 编辑或创建 spark-defaults.conf
  • 将配置传递给 spark-submit 命令(或 spark-shell / spark-sql CLI 命令)。
  • 在初始化上下文时添加配置。
参考资料

在使用 ClickHouse 集群时,需要为每个实例设置唯一的 Catalog 名称。 例如:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

通过这种方式,你可以在 Spark SQL 中通过 clickhouse1.<ck_db>.<ck_table> 访问 clickhouse1 实例中的 <ck_db>.<ck_table> 表,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 实例中的 <ck_db>.<ck_table> 表。

ClickHouse Cloud 设置

在连接到 ClickHouse Cloud 时,请确保启用 SSL,并设置合适的 SSL 模式。例如:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

读取数据

public static void main(String[] args) {
        // 创建一个 Spark 会话
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

写入数据

 public static void main(String[] args) throws AnalysisException {

        // 创建 Spark 会话
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // 定义 DataFrame 的 schema
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // 创建 DataFrame
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

DDL 操作

你可以使用 Spark SQL 对 ClickHouse 实例执行 DDL 操作,所有更改都会立即持久化到 ClickHouse 中。 Spark SQL 允许你像在 ClickHouse 中一样编写查询, 因此你可以直接执行诸如 CREATE TABLE、TRUNCATE 等命令——无需修改,例如:

注意

使用 Spark SQL 时,每次只能执行一条语句。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

上述示例演示了 Spark SQL 查询,你可以在应用程序中使用任意 API(如 Java、Scala、PySpark 或 shell)来运行这些查询。

配置

以下是该连接器中可调整的配置项:


默认描述自从
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse 支持将复杂表达式用作分片键或分区值,例如 cityHash64(col_1, col_2),但这些目前在 Spark 中不受支持。若为 true,则忽略这些不受支持的表达式,否则将立即失败并抛出异常。注意,当启用 spark.clickhouse.write.distributed.convertLocal 时,忽略不受支持的分片键可能会导致数据损坏。0.4.0
spark.clickhouse.read.compression.codeclz4用于在读取时解压数据的编解码器。支持的编码格式:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue在读取 Distributed 表时,改为读取对应的本地表而不是 Distributed 表本身。若为 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes0.1.0
spark.clickhouse.read.fixedStringAs二进制按指定的 Spark 数据类型读取 ClickHouse 的 FixedString 类型。支持的类型:binary、string0.8.0
spark.clickhouse.read.formatJSON读取时使用的序列化格式。支持的格式:JSON、Binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse为读取启用运行时过滤。0.8.0
spark.clickhouse.read.splitByPartitionIdtrue如果为 true,则通过虚拟列 _partition_id 而不是分区值来构造输入分区过滤条件。通过分区值组装 SQL 谓词已知存在问题。此功能需要 ClickHouse Server v21.6 及以上版本。0.4.0
spark.clickhouse.useNullableQuerySchemafalse如果设置为 true,在创建表时执行 CREATE/REPLACE TABLE ... AS SELECT ... 会将查询 schema 中的所有字段都标记为可为空。注意,此配置依赖于 SPARK-43390(在 Spark 3.5 中可用),如果没有该补丁,其行为始终等同于 true0.8.0
spark.clickhouse.write.batchSize10000每批写入 ClickHouse 的记录数。0.1.0
spark.clickhouse.write.compression.codeclz4用于在写入数据时进行压缩的编解码器。支持的编解码器:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse在写入 Distributed 表时,改为写入本地表而不是 Distributed 表本身。若为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue在写入 Distributed 表时,将数据写入集群中的所有节点。0.1.0
spark.clickhouse.write.format箭头写入时使用的序列化格式。支持的格式:JSON、Arrow0.4.0
spark.clickhouse.write.localSortByKeytrue如果为 true,在写入前按排序键进行本地排序。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition 的值如果为 true,则在写入前按分区进行本地排序。若未设置,则等同于 spark.clickhouse.write.repartitionByPartition0.3.0
spark.clickhouse.write.maxRetry3对因可重试错误码而失败的单个批量写入操作允许的最大重试次数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue在写入之前,是否根据 ClickHouse 分区键对数据重新分区,以匹配 ClickHouse 表的分区分布。0.3.0
spark.clickhouse.write.repartitionNum0如果在写入前需要根据 ClickHouse 表的分布对数据进行重新分区,可使用此配置来指定重新分区的分区数;当该值小于 1 时表示对此无要求。0.1.0
spark.clickhouse.write.repartitionStrictlyfalse如果为 true,Spark 会在写入时将传入记录严格分布到各个分区,以满足所需的分布要求,然后再将记录传递给数据源表。否则,Spark 可能会应用某些优化以加速查询,但可能会破坏分布要求。注意,该配置依赖于补丁 SPARK-37523(在 Spark 3.4 中可用),在没有该补丁时,其行为始终等同于 true0.3.0
spark.clickhouse.write.retryInterval10s两次写入重试之间的时间间隔(以秒为单位)。0.1.0
spark.clickhouse.write.retryableErrorCodes241在写入失败时由 ClickHouse 服务器返回的可重试的错误码。0.1.0

支持的数据类型

本节说明 Spark 与 ClickHouse 之间的数据类型映射。下表提供了快速参考,便于在从 ClickHouse 读取数据到 Spark,或从 Spark 向 ClickHouse 插入数据时进行数据类型转换。

从 ClickHouse 读取数据到 Spark

ClickHouse 数据类型Spark 数据类型是否支持是否为原始类型说明
NothingNullTypeYes
BoolBooleanTypeYes
UInt8, Int16ShortTypeYes
Int8ByteTypeYes
UInt16,Int32IntegerTypeYes
UInt32,Int64, UInt64LongTypeYes
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Yes
Float32FloatTypeYes
Float64DoubleTypeYes
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeYes
FixedStringBinaryType, StringTypeYes由配置项 READ_FIXED_STRING_AS 控制
DecimalDecimalTypeYes精度和小数位数最高支持到 Decimal128
Decimal32DecimalType(9, scale)Yes
Decimal64DecimalType(18, scale)Yes
Decimal128DecimalType(38, scale)Yes
Date, Date32DateTypeYes
DateTime, DateTime32, DateTime64TimestampTypeYes
ArrayArrayTypeNo数组元素类型也会被转换
MapMapTypeNo键类型仅支持 StringType
IntervalYearYearMonthIntervalType(Year)Yes
IntervalMonthYearMonthIntervalType(Month)Yes
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeNo会使用对应的具体区间类型
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

从 Spark 向 ClickHouse 插入数据

Spark 数据类型ClickHouse 数据类型是否支持是否为基础类型说明
BooleanTypeUInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和小数位数最高可达 Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType(list、tuple 或 array)Array数组元素类型也会被转换
MapTypeMap键类型仅限于 StringType
Object
Nested

贡献与支持

如果您希望为该项目做出贡献或报告任何问题,我们非常欢迎您的参与! 请访问我们的 GitHub 仓库 来提交 issue、提出改进建议或发起 pull request。 我们欢迎所有贡献!在开始之前,请先查看仓库中的贡献指南。 感谢您帮助改进我们的 ClickHouse Spark 连接器!