跳转到主内容
跳转到主内容

Spark 连接器

ClickHouse Supported

此连接器利用 ClickHouse 特定的优化功能,例如高级分区和谓词下推,以提升查询性能和数据处理能力。 该连接器基于 ClickHouse 官方 JDBC 连接器,并自行管理其 catalog。

在 Spark 3.0 之前,Spark 缺少内置的 catalog 概念,因此用户通常依赖 Hive Metastore 或 AWS Glue 等外部 catalog 系统。 在使用这些外部方案时,用户必须在 Spark 中访问这些表之前,手动注册其数据源表。 然而,自从 Spark 3.0 引入 catalog 概念后,Spark 现在可以通过注册 catalog 插件自动发现数据表。

Spark 的默认 catalog 是 spark_catalog,表通过 {catalog name}.{database}.{table} 来标识。借助新的 catalog 功能,现在可以在同一个 Spark 应用程序中添加并使用多个 catalog。

在 Catalog API 和 TableProvider API 之间进行选择

ClickHouse Spark 连接器支持两种访问模式:Catalog APITableProvider API(基于格式的访问)。了解二者之间的差异有助于根据具体用例选择合适的方式。

Catalog API 与 TableProvider API 对比

功能Catalog APITableProvider API
配置方式通过 Spark 配置集中管理按操作通过选项配置
表发现机制通过 catalog 自动发现手动指定表
DDL 操作完整支持(CREATE、DROP、ALTER)支持有限(仅自动建表)
Spark SQL 集成原生(clickhouse.database.table需要指定数据格式(format)
使用场景长期、稳定连接并集中管理配置临时、动态或一次性访问

环境要求

  • Java 8 或 17(Spark 4.0 需要 Java 17+)
  • Scala 2.12 或 2.13(Spark 4.0 仅支持 Scala 2.13)
  • Apache Spark 3.3、3.4、3.5 或 4.0

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
mainSpark 3.3, 3.4, 3.5, 4.00.9.4
0.10.0Spark 3.3, 3.4, 3.5, 4.00.9.5
0.9.0Spark 3.3, 3.4, 3.5, 4.00.9.4
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3无依赖
0.3.0Spark 3.2, 3.3无依赖
0.2.1Spark 3.2无依赖
0.1.2Spark 3.2无依赖

安装与设置

要将 ClickHouse 集成到 Spark 中,有多种安装选项可适配不同的项目配置。 可以在项目的构建文件中(例如 Maven 的 pom.xml 或 SBT 的 build.sbt)直接添加 ClickHouse Spark 连接器作为依赖。 或者,也可以将所需的 JAR 文件放入 $SPARK_HOME/jars/ 目录,或在使用 spark-submit 命令时通过 --jars 参数作为 Spark 选项直接传入。 这两种方式都能确保 ClickHouse 连接器在 Spark 环境中可用。

作为依赖导入

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

如果希望使用 SNAPSHOT 版本,请添加以下仓库。

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

下载依赖库

二进制 JAR 的命名模式为:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

你可以在 Maven Central Repository 中找到所有已发布的 JAR 文件, 并在 Sonatype OSS Snapshots Repository 中找到所有每日构建的 SNAPSHOT 版 JAR 文件。

信息

务必包含带有 all classifier 的 clickhouse-jdbc JAR, 因为该连接器依赖 clickhouse-httpclickhouse-client —— 这两者都已打包在 clickhouse-jdbc:all 中。 或者,如果你不想使用完整的 JDBC 包,也可以分别添加 clickhouse-client JARclickhouse-http

无论采用哪种方式,请确保各个包的版本根据 Compatibility Matrix 之间是兼容的。

注册 catalog(必需)

若要访问 ClickHouse 表,必须创建并配置一个新的 Spark catalog,并使用以下配置:

属性默认值是否必需
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

可以通过以下任一方式设置这些配置:

  • 编辑或创建 spark-defaults.conf
  • 将配置传递给 spark-submit 命令(或 spark-shell/spark-sql CLI 命令)。
  • 在初始化上下文时添加配置。
信息

在使用 ClickHouse 集群时,需要为每个实例设置唯一的 catalog 名称。 例如:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

这样,你就可以在 Spark SQL 中通过 clickhouse1.<ck_db>.<ck_table> 访问 clickhouse1 中的 <ck_db>.<ck_table> 表,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 中的 <ck_db>.<ck_table> 表。

使用 TableProvider API(基于格式的访问)

除了基于 catalog 的方式之外,ClickHouse Spark 连接器还支持通过 TableProvider API 采用基于格式的访问模式

基于格式的读取示例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 使用格式 API 从 ClickHouse 读取数据
df = spark.read \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .load()

df.show()

基于格式的写入示例

# 使用 format API 向 ClickHouse 写入数据
df.write \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .mode("append") \
    .save()

TableProvider 功能

TableProvider API 提供了一些强大的功能:

自动建表

当向一个不存在的表写入数据时,连接器会根据合适的 schema 自动创建该表。连接器提供了智能默认值:

  • Engine:如果未指定,则默认为 MergeTree()。你可以通过 engine 选项指定不同的 engine(例如:ReplacingMergeTree()SummingMergeTree() 等)。
  • ORDER BY必需 —— 创建新表时必须显式指定 order_by 选项。连接器会校验所有指定的列是否存在于 schema 中。
  • Nullable Key 支持:如果 ORDER BY 中包含 Nullable 列,会自动添加 settings.allow_nullable_key=1
# 使用显式 ORDER BY(必需)自动创建表
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id") \
    .mode("append") \
    .save()

# 使用自定义 engine 指定建表选项
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id, timestamp") \
    .option("engine", "ReplacingMergeTree()") \
    .option("settings.allow_nullable_key", "1") \
    .mode("append") \
    .save()
信息

必须指定 ORDER BY:通过 TableProvider API 创建新表时,order_by 选项是必需的。你必须显式指定用于 ORDER BY 子句的列。连接器会校验所有指定列是否存在于 schema 中,如果有任意列缺失,则会抛出错误。

Engine 选择:默认的 engine 是 MergeTree(),但你可以通过 engine 选项指定任意 ClickHouse 表引擎(例如:ReplacingMergeTree()SummingMergeTree()AggregatingMergeTree() 等)。

TableProvider 连接选项

在使用基于格式的 API 时,可配置以下连接选项:

连接选项

选项描述默认值是否必填
hostClickHouse 服务器主机名localhost
protocol连接协议(httphttpshttp
http_portHTTP/HTTPS 端口8123
database数据库名称default
table表名N/A
user用于认证的用户名default
password用于认证的密码(empty string)
ssl是否启用 SSL 连接false
ssl_modeSSL 模式(NONESTRICT 等)STRICT
timezone用于日期/时间操作的时区server

表创建选项

当目标表不存在且需要新建时,可使用以下选项:

OptionDescriptionDefault ValueRequired
order_by用于 ORDER BY 子句的列。多个列使用逗号分隔N/AYes
engineClickHouse 表引擎(例如 MergeTree(), ReplacingMergeTree(), SummingMergeTree() 等)MergeTree()No
settings.allow_nullable_key在 ORDER BY 中启用 Nullable 键(适用于 ClickHouse Cloud)Auto-detected**No
settings.<key>任意 ClickHouse 表级 SETTINGN/ANo
cluster分布式表的集群名称N/ANo
clickhouse.column.<name>.variant_typesVariant 列对应的 ClickHouse 类型列表,使用逗号分隔(例如 String, Int64, Bool, JSON)。类型名区分大小写。逗号后的空格可选。N/ANo

* 在创建新表时,order_by 选项是必需的。所有指定的列必须存在于模式(schema)中。
** 如果 ORDER BY 中包含 Nullable 列且未显式提供该设置,则会自动设置为 1

提示

最佳实践:对于 ClickHouse Cloud,如果你的 ORDER BY 列可能为 Nullable 类型,请显式设置 settings.allow_nullable_key=1,因为 ClickHouse Cloud 需要此 SETTING。

写入模式

Spark 连接器(包括 TableProvider API 和 Catalog API)支持以下写入模式:

  • append:向已有表追加数据
  • overwrite:替换表中的全部数据(截断表)
信息

不支持分区覆盖写入:该连接器目前不支持分区级覆盖写入操作(例如在 overwrite 模式下使用 partitionBy)。该功能正在开发中。进度跟踪请参阅 GitHub issue #34

# overwrite 模式(会先截断表)
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .mode("overwrite") \
    .save()

配置 ClickHouse 选项

Catalog API 和 TableProvider API 都支持配置 ClickHouse 专用选项(而非连接器选项)。这些选项在创建表或执行查询时会被原样传递给 ClickHouse。

ClickHouse 选项允许你配置 ClickHouse 专用的 SETTING,例如 allow_nullable_keyindex_granularity,以及其他表级或查询级的 SETTING。它们不同于连接器选项(例如 hostdatabasetable),连接器选项用于控制连接器如何连接到 ClickHouse。

使用 TableProvider API

使用 TableProvider API 时,请采用 settings.&lt;key&gt; 选项格式:

df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .option("order_by", "id") \
    .option("settings.allow_nullable_key", "1") \
    .option("settings.index_granularity", "8192") \
    .mode("append") \
    .save()

使用 Catalog API

使用 Catalog API 时,在 Spark 配置中使用 spark.sql.catalog.&lt;catalog_name&gt;.option.&lt;key&gt; 格式:

spark.sql.catalog.clickhouse.option.allow_nullable_key 1
spark.sql.catalog.clickhouse.option.index_granularity 8192

或者在通过 Spark SQL 创建表时设置:

CREATE TABLE clickhouse.default.my_table (
  id INT,
  name STRING
) USING ClickHouse
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  'settings.allow_nullable_key' = '1',
  'settings.index_granularity' = '8192'
)

ClickHouse Cloud 设置

连接到 ClickHouse Cloud 时,请确保启用 SSL,并设置合适的 SSL 模式。例如:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

读取数据

public static void main(String[] args) {
        // 创建 Spark 会话
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

写入数据

信息

不支持分区覆盖写入:Catalog API 当前不支持分区级别的覆盖写入操作(例如在 overwrite 模式下配合 partitionBy 使用)。此功能正在开发中。有关该功能的进展,请参阅 GitHub issue #34

 public static void main(String[] args) throws AnalysisException {

        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // Define the schema for the DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // Create a DataFrame
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

DDL 操作

可以使用 Spark SQL 在 ClickHouse 实例上执行 DDL 操作,所有更改都会立即持久化到 ClickHouse。 Spark SQL 允许像在 ClickHouse 中一样编写查询, 因此可以直接执行诸如 CREATE TABLE、TRUNCATE 等命令——无需任何修改,例如:

注意

使用 Spark SQL 时,一次只能执行一条语句。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

以上示例展示了 Spark SQL 查询,你可以在应用程序中使用任意 API(Java、Scala、PySpark 或 shell)来运行这些查询。

使用 VariantType

注意

Spark 从 4.0+ 版本开始支持 VariantType,并且需要在启用了 experimental JSON/Variant 类型的 ClickHouse 25.3+ 上使用。

该连接器支持 Spark 的 VariantType 用于处理半结构化数据。VariantType 映射到 ClickHouse 的 JSONVariant 类型,从而可以高效地存储和查询模式灵活的数据。

注意

本节专门介绍 VariantType 的映射和用法。有关所有受支持数据类型的全面概览,请参阅受支持的数据类型一节。

ClickHouse 类型映射

ClickHouse 类型Spark 类型描述
JSONVariantType仅存储 JSON 对象(必须以 { 开头)
Variant(T1, T2, ...)VariantType可存储多种类型,包括基本类型、数组和 JSON

读取 VariantType 数据

从 ClickHouse 读取数据时,JSONVariant 列会自动映射到 Spark 的 VariantType

// 将 JSON 列读取为 VariantType
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")

// 访问 VariantType 数据
df.show()

// 将 VariantType 转换为 JSON 字符串以便查看
import org.apache.spark.sql.functions._
df.select(
  col("id"),
  to_json(col("data")).as("data_json")
).show()

写入 VariantType 数据

你可以使用 JSON 或 Variant 列类型将 VariantType 数据写入 ClickHouse:

import org.apache.spark.sql.functions._

// 使用 JSON 数据创建 DataFrame
val jsonData = Seq(
  (1, """{"name": "Alice", "age": 30}"""),
  (2, """{"name": "Bob", "age": 25}"""),
  (3, """{"name": "Charlie", "city": "NYC"}""")
).toDF("id", "json_string")

// 将 JSON 字符串解析为 VariantType
val variantDF = jsonData.select(
  col("id"),
  parse_json(col("json_string")).as("data")
)

// 使用 JSON 类型写入 ClickHouse(仅支持 JSON 对象)
variantDF.writeTo("clickhouse.default.user_data").create()

// 或者指定支持多种类型的 Variant 列
spark.sql("""
  CREATE TABLE clickhouse.default.mixed_data (
    id INT,
    data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'id'
  )
""")

使用 Spark SQL 创建 VariantType 表

你可以使用 Spark SQL 的 DDL 语句创建 VariantType 表:

-- Create table with JSON type (default)
CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)
-- Create table with Variant type supporting multiple types
CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

配置 VariantType 类型

在创建包含 VariantType 列的表时,可以指定要使用的 ClickHouse 类型:

JSON 类型(默认)

如果未指定 variant_types 属性,列将默认使用 ClickHouse 的 JSON 类型,该类型只接受 JSON 对象:

CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

这会生成如下 ClickHouse 查询:

CREATE TABLE json_table (id Int32, data JSON) ENGINE = MergeTree() ORDER BY id

具有多种类型的 Variant 类型

要支持原始类型、数组和 JSON 对象,请在 variant_types 属性中指定这些类型:

CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

这会生成如下 ClickHouse 查询:

CREATE TABLE flexible_data (
  id Int32, 
  data Variant(String, Int64, Float64, Bool, Array(String), JSON)
) ENGINE = MergeTree() ORDER BY id

支持的 Variant 类型

Variant() 中可以使用以下 ClickHouse 类型:

  • 基本类型StringInt8Int16Int32Int64UInt8UInt16UInt32UInt64Float32Float64Bool
  • 数组Array(T),其中 T 为任意受支持的类型,包括嵌套数组
  • JSONJSON,用于存储 JSON 对象

读取格式配置

默认情况下,JSON 和 Variant 列会被读取为 VariantType。可以通过以下配置将其改为按字符串读取:

// 将 JSON/Variant 读取为字符串,而不是 VariantType
spark.conf.set("spark.clickhouse.read.jsonAs", "string")

val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// data 列将是包含 JSON 字符串的 StringType

写入格式支持

VariantType 的写入支持在不同格式之间有所差异:

格式支持情况说明
JSON✅ 完全支持同时支持 JSONVariant 类型。推荐用于 VariantType 数据
Arrow⚠️ 部分支持支持写入 ClickHouse 的 JSON 类型。不支持 ClickHouse 的 Variant 类型。完整支持尚待 https://github.com/ClickHouse/ClickHouse/issues/92752 问题解决后提供

配置写入格式:

spark.conf.set("spark.clickhouse.write.format", "json")  // Recommended for Variant types
提示

如需向 ClickHouse 的 Variant 类型写入数据,请使用 JSON 格式。Arrow 格式仅支持写入 JSON 类型。

最佳实践

  1. 仅包含 JSON 数据时使用 JSON 类型:如果只存储 JSON 对象,请使用默认的 JSON 类型(不设置 variant_types 属性)
  2. 显式指定类型:使用 Variant() 时,显式列出计划存储的所有类型
  3. 启用实验功能:确保 ClickHouse 已启用 allow_experimental_json_type = 1
  4. 写入时使用 JSON 格式:建议对 VariantType 数据使用 JSON 格式,以获得更好的兼容性
  5. 考虑查询模式:JSON/Variant 类型支持 ClickHouse 的 JSON 路径查询,从而实现高效过滤
  6. 使用列提示优化性能:在 ClickHouse 中使用 JSON 字段时,添加列提示有助于提升查询性能。目前尚不支持通过 Spark 添加列提示。有关该功能的进展,请参阅 GitHub issue #497

完整工作流示例

import org.apache.spark.sql.functions._

// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")

// Create table with Variant column
spark.sql("""
  CREATE TABLE clickhouse.default.events (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'event_time'
  )
""")

// Prepare data with mixed types
val events = Seq(
  (1L, "2024-01-01 10:00:00", """{"action": "login", "user_id": 123}"""),
  (2L, "2024-01-01 10:05:00", """{"action": "purchase", "amount": 99.99}"""),
  (3L, "2024-01-01 10:10:00", """{"action": "logout", "duration": 600}""")
).toDF("event_id", "event_time", "json_data")

// Convert to VariantType and write
val variantEvents = events.select(
  col("event_id"),
  to_timestamp(col("event_time")).as("event_time"),
  parse_json(col("json_data")).as("event_data")
)

variantEvents.writeTo("clickhouse.default.events").append()

// Read and query
val result = spark.sql("""
  SELECT event_id, event_time, event_data
  FROM clickhouse.default.events
  WHERE event_time >= '2024-01-01'
  ORDER BY event_time
""")

result.show(false)

配置

以下是该连接器中可调整的配置项。

注意

使用配置:这些是 Spark 级别的配置选项,适用于 Catalog API 和 TableProvider API。可以通过两种方式进行设置:

  1. 全局 Spark 配置(适用于所有操作):

    spark.conf.set("spark.clickhouse.write.batchSize", "20000")
    spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
    
  2. 按操作覆盖设置(仅适用于 TableProvider API——可覆盖全局设置):

    df.write \
        .format("clickhouse") \
        .option("host", "your-host") \
        .option("database", "default") \
        .option("table", "my_table") \
        .option("spark.clickhouse.write.batchSize", "20000") \
        .option("spark.clickhouse.write.compression.codec", "lz4") \
        .mode("append") \
        .save()
    

或者在 spark-defaults.conf 中设置,或者在创建 Spark 会话时进行设置。


键名默认值说明自版本起
spark.clickhouse.ignoreUnsupportedTransformtrueClickHouse 支持使用复杂表达式作为分片键或分区值,例如 cityHash64(col_1, col_2),而这些目前在 Spark 中尚不受支持。若为 true,则忽略这些不受支持的表达式并记录警告日志,否则快速失败并抛出异常。警告:当 spark.clickhouse.write.distributed.convertLocal=true 时,忽略不受支持的分片键可能会导致数据损坏。连接器会对此进行校验,并在默认情况下抛出错误。若要允许该行为,请显式设置 spark.clickhouse.write.distributed.convertLocal.allowUnsupportedSharding=true0.4.0
spark.clickhouse.read.compression.codeclz4用于在读取时对数据进行解压缩的编解码器。支持的编解码器:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue读取分布式表时,改为读取其对应的本地表而非分布式表本身。若为 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes0.1.0
spark.clickhouse.read.fixedStringAsbinary将 ClickHouse 的 FixedString 类型读取为指定的 Spark 数据类型。支持的类型:binary、string0.8.0
spark.clickhouse.read.formatjson读取时使用的序列化格式。支持的格式:json、binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse启用读取时的运行时过滤器。0.8.0
spark.clickhouse.read.splitByPartitionIdtrue如果为 true,则通过虚拟列 _partition_id 而不是分区值构造输入分区过滤器。在基于分区值构造 SQL 谓词时存在已知问题。此功能需要 ClickHouse Server v21.6 或更高版本。0.4.0
spark.clickhouse.useNullableQuerySchemafalse如果为 true,在通过执行 CREATE/REPLACE TABLE ... AS SELECT ... 创建或替换表时,将查询的 schema 中所有字段标记为 Nullable。注意,此配置依赖于 SPARK-43390(在 Spark 3.5 中可用);在未应用该补丁的情况下,其行为始终等同于 true0.8.0
spark.clickhouse.write.batchSize10000写入 ClickHouse 时每个批次包含的记录数。0.1.0
spark.clickhouse.write.compression.codeclz4用于在写入数据时进行压缩的编解码器。支持的编解码器:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse写入分布式表时,将数据写入其本地表而不是分布式表本身。若为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes。这会绕过 ClickHouse 的原生路由,需要由 Spark 负责计算分片键。当使用不受支持的分片表达式时,将 spark.clickhouse.ignoreUnsupportedTransform 设为 false,以防止静默的数据分布错误。0.1.0
spark.clickhouse.write.distributed.convertLocal.allowUnsupportedShardingfalse当分片键不受支持时,在 convertLocal=trueignoreUnsupportedTransform=true 的情况下允许写入分布式表。此选项存在风险,可能因错误分片导致数据损坏。将其设置为 true 时,必须在写入前确保数据已正确排序/分片,因为 Spark 无法计算不受支持的分片表达式。仅在充分理解风险并已验证数据分布的前提下才可将其设置为 true。默认情况下,该组合会抛出错误,以防止静默的数据损坏。0.10.0
spark.clickhouse.write.distributed.useClusterNodestrue在向分布式表写入数据时,向集群中所有节点写入数据。0.1.0
spark.clickhouse.write.formatarrow用于写入的序列化格式。支持的格式:json、arrow0.4.0
spark.clickhouse.write.localSortByKeytrue如果为 true,则在写入前按排序键在本地排序。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition 的值如果为 true,则在写入前按分区进行本地排序。若未显式设置,则与 spark.clickhouse.write.repartitionByPartition 相同。0.3.0
spark.clickhouse.write.maxRetry3单个批量写入因可重试错误码失败时的最大重试次数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue是否在写入前按 ClickHouse 分区键重新分区数据,以满足 ClickHouse 表的数据分布要求。0.3.0
spark.clickhouse.write.repartitionNum0在写入前需要对数据重新分区以满足 ClickHouse 表的分布要求,使用此配置指定重新分区的分区数,取值小于 1 表示不作要求。0.1.0
spark.clickhouse.write.repartitionStrictlyfalse如果为 true,Spark 会在写入前严格地将传入记录按要求的分布策略分配到各个分区,然后再将记录传递给数据源表。否则,Spark 可能会应用某些优化来加速查询,但会破坏分布要求。注意,此配置依赖 SPARK-37523(在 Spark 3.4 中提供),若未包含该补丁,其行为始终等同于 true0.3.0
spark.clickhouse.write.retryInterval10s两次写入重试之间的时间间隔(秒)。0.1.0
spark.clickhouse.write.retryableErrorCodes241写入失败时 ClickHouse 服务器返回的可重试错误码。0.1.0

支持的数据类型

本节说明 Spark 与 ClickHouse 之间的数据类型映射关系。下表提供了在从 ClickHouse 读取数据到 Spark,以及从 Spark 向 ClickHouse 写入数据时进行数据类型转换的快速参考。

将 ClickHouse 中的数据读取到 Spark

ClickHouse Data TypeSpark 数据类型是否支持是否为基本类型说明
NothingNullType
BoolBooleanType
UInt8, Int16ShortType
Int8ByteType
UInt16,Int32IntegerType
UInt32,Int64, UInt64LongType
Int128,UInt128, Int256, UInt256DecimalType(38, 0)
Float32FloatType
Float64DoubleType
String, UUID, Enum8, Enum16, IPv4, IPv6StringType
FixedStringBinaryType, StringType由配置 READ_FIXED_STRING_AS 控制
DecimalDecimalType精度和小数位最高到 Decimal128
Decimal32DecimalType(9, scale)
Decimal64DecimalType(18, scale)
Decimal128DecimalType(38, scale)
Date, Date32DateType
DateTime, DateTime32, DateTime64TimestampType
ArrayArrayType数组元素类型也会被转换
MapMapType键类型仅限于 StringType
IntervalYearYearMonthIntervalType(Year)
IntervalMonthYearMonthIntervalType(Month)
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalType使用对应的具体区间类型
JSON, VariantVariantType需要 Spark 4.0+ 和 ClickHouse 25.3+。可通过 spark.clickhouse.read.jsonAs=string 读取为 StringType
Object
Nested
TupleStructType同时支持具名和不具名 tuple。具名 tuple 按名称映射到 struct 字段,不具名 tuple 使用 _1_2 等。支持嵌套 struct 和 Nullable 字段
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

从 Spark 向 ClickHouse 插入数据

Spark 数据类型ClickHouse 数据类型是否支持是否为原始类型说明
BooleanTypeBool自 0.9.0 版本起映射为 Bool 类型(而非 UInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和小数位数(scale)最高可达 Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType (list, tuple, or array)Array数组元素类型也会被转换
MapTypeMap键仅限于 StringType
StructTypeTuple转换为带字段名的命名 Tuple
VariantTypeJSON or Variant需要 Spark 4.0+ 和 ClickHouse 25.3+。默认映射为 JSON 类型。使用 clickhouse.column.&lt;name&gt;.variant_types 属性可指定具有多种类型的 Variant
Object
Nested

贡献与支持

如果您希望为本项目做出贡献或报告任何问题,我们非常欢迎您的参与! 请访问我们的 GitHub 仓库 来提交 issue、提出改进建议或发起 pull request。 我们欢迎所有贡献!在开始之前,请先阅读仓库中的贡献指南。 感谢您帮助改进我们的 ClickHouse Spark connector!