此连接器利用 ClickHouse 特定的优化功能,例如高级分区和谓词下推,以提升查询性能和数据处理能力。
该连接器基于 ClickHouse 官方 JDBC 连接器,并自行管理其 catalog。
在 Spark 3.0 之前,Spark 缺少内置的 catalog 概念,因此用户通常依赖 Hive Metastore 或 AWS Glue 等外部 catalog 系统。
在使用这些外部方案时,用户必须在 Spark 中访问这些表之前,手动注册其数据源表。
然而,自从 Spark 3.0 引入 catalog 概念后,Spark 现在可以通过注册 catalog 插件自动发现数据表。
Spark 的默认 catalog 是 spark_catalog,表通过 {catalog name}.{database}.{table} 来标识。借助新的 catalog 功能,现在可以在同一个 Spark 应用程序中添加并使用多个 catalog。
在 Catalog API 和 TableProvider API 之间进行选择
ClickHouse Spark 连接器支持两种访问模式:Catalog API 和 TableProvider API(基于格式的访问)。了解二者之间的差异有助于根据具体用例选择合适的方式。
Catalog API 与 TableProvider API 对比
| 功能 | Catalog API | TableProvider API |
|---|
| 配置方式 | 通过 Spark 配置集中管理 | 按操作通过选项配置 |
| 表发现机制 | 通过 catalog 自动发现 | 手动指定表 |
| DDL 操作 | 完整支持(CREATE、DROP、ALTER) | 支持有限(仅自动建表) |
| Spark SQL 集成 | 原生(clickhouse.database.table) | 需要指定数据格式(format) |
| 使用场景 | 长期、稳定连接并集中管理配置 | 临时、动态或一次性访问 |
环境要求
- Java 8 或 17(Spark 4.0 需要 Java 17+)
- Scala 2.12 或 2.13(Spark 4.0 仅支持 Scala 2.13)
- Apache Spark 3.3、3.4、3.5 或 4.0
兼容性矩阵
| 版本 | 兼容的 Spark 版本 | ClickHouse JDBC 版本 |
|---|
| main | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.10.0 | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.5 |
| 0.9.0 | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 |
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
| 0.4.0 | Spark 3.2, 3.3 | 无依赖 |
| 0.3.0 | Spark 3.2, 3.3 | 无依赖 |
| 0.2.1 | Spark 3.2 | 无依赖 |
| 0.1.2 | Spark 3.2 | 无依赖 |
安装与设置
要将 ClickHouse 集成到 Spark 中,有多种安装选项可适配不同的项目配置。
可以在项目的构建文件中(例如 Maven 的 pom.xml 或 SBT 的 build.sbt)直接添加 ClickHouse Spark 连接器作为依赖。
或者,也可以将所需的 JAR 文件放入 $SPARK_HOME/jars/ 目录,或在使用 spark-submit 命令时通过 --jars 参数作为 Spark 选项直接传入。
这两种方式都能确保 ClickHouse 连接器在 Spark 环境中可用。
作为依赖导入
<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
如果希望使用 SNAPSHOT 版本,请添加以下仓库。
<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
dependencies {
implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
}
如果希望使用 SNAPSHOT 版本,请添加以下仓库:
repositries {
maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
}
libraryDependencies += "com.clickhouse" % "clickhouse-jdbc" % {{ clickhouse_jdbc_version }} classifier "all"
libraryDependencies += "com.clickhouse.spark" %% clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }} % {{ stable_version }}
在使用 Spark 的 shell 选项(Spark SQL CLI、Spark Shell CLI 和 Spark Submit 命令)时,可以通过传入所需的 JAR 包来注册依赖:
$SPARK_HOME/bin/spark-sql \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
如果希望避免将 JAR 文件复制到 Spark 客户端节点,可以改用以下方式:
--repositories https://{maven-central-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}
注意:对于仅 SQL 的使用场景,推荐在生产环境中使用 Apache Kyuubi。
下载依赖库
二进制 JAR 的命名模式为:
clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar
你可以在 Maven Central Repository 中找到所有已发布的 JAR 文件,
并在 Sonatype OSS Snapshots Repository 中找到所有每日构建的 SNAPSHOT 版 JAR 文件。
注册 catalog(必需)
若要访问 ClickHouse 表,必须创建并配置一个新的 Spark catalog,并使用以下配置:
| 属性 | 值 | 默认值 | 是否必需 |
|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Yes |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | No |
spark.sql.catalog.<catalog_name>.protocol | http | http | No |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | No |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | No |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (empty string) | No |
spark.sql.catalog.<catalog_name>.database | <database> | default | No |
spark.<catalog_name>.write.format | json | arrow | No |
可以通过以下任一方式设置这些配置:
- 编辑或创建
spark-defaults.conf。
- 将配置传递给
spark-submit 命令(或 spark-shell/spark-sql CLI 命令)。
- 在初始化上下文时添加配置。
信息
在使用 ClickHouse 集群时,需要为每个实例设置唯一的 catalog 名称。
例如:
spark.sql.catalog.clickhouse1 com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host 10.0.0.1
spark.sql.catalog.clickhouse1.protocol https
spark.sql.catalog.clickhouse1.http_port 8443
spark.sql.catalog.clickhouse1.user default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database default
spark.sql.catalog.clickhouse1.option.ssl true
spark.sql.catalog.clickhouse2 com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host 10.0.0.2
spark.sql.catalog.clickhouse2.protocol https
spark.sql.catalog.clickhouse2.http_port 8443
spark.sql.catalog.clickhouse2.user default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database default
spark.sql.catalog.clickhouse2.option.ssl true
这样,你就可以在 Spark SQL 中通过 clickhouse1.<ck_db>.<ck_table> 访问 clickhouse1 中的 <ck_db>.<ck_table> 表,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 中的 <ck_db>.<ck_table> 表。
使用 TableProvider API(基于格式的访问)
除了基于 catalog 的方式之外,ClickHouse Spark 连接器还支持通过 TableProvider API 采用基于格式的访问模式。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# 使用格式 API 从 ClickHouse 读取数据
df = spark.read \
.format("clickhouse") \
.option("host", "your-clickhouse-host") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "your_table") \
.option("user", "default") \
.option("password", "your_password") \
.option("ssl", "true") \
.load()
df.show()
val df = spark.read
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.load()
df.show()
Dataset<Row> df = spark.read()
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.load();
df.show();
# 使用 format API 向 ClickHouse 写入数据
df.write \
.format("clickhouse") \
.option("host", "your-clickhouse-host") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "your_table") \
.option("user", "default") \
.option("password", "your_password") \
.option("ssl", "true") \
.mode("append") \
.save()
df.write
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.mode("append")
.save()
df.write()
.format("clickhouse")
.option("host", "your-clickhouse-host")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "your_table")
.option("user", "default")
.option("password", "your_password")
.option("ssl", "true")
.mode("append")
.save();
TableProvider 功能
TableProvider API 提供了一些强大的功能:
自动建表
当向一个不存在的表写入数据时,连接器会根据合适的 schema 自动创建该表。连接器提供了智能默认值:
- Engine:如果未指定,则默认为
MergeTree()。你可以通过 engine 选项指定不同的 engine(例如:ReplacingMergeTree()、SummingMergeTree() 等)。
- ORDER BY:必需 —— 创建新表时必须显式指定
order_by 选项。连接器会校验所有指定的列是否存在于 schema 中。
- Nullable Key 支持:如果 ORDER BY 中包含 Nullable 列,会自动添加
settings.allow_nullable_key=1
# 使用显式 ORDER BY(必需)自动创建表
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "new_table") \
.option("order_by", "id") \
.mode("append") \
.save()
# 使用自定义 engine 指定建表选项
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "new_table") \
.option("order_by", "id, timestamp") \
.option("engine", "ReplacingMergeTree()") \
.option("settings.allow_nullable_key", "1") \
.mode("append") \
.save()
// 使用显式 ORDER BY(必需)自动创建表
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id")
.mode("append")
.save()
// 使用显式建表选项和自定义 engine
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id, timestamp")
.option("engine", "ReplacingMergeTree()")
.option("settings.allow_nullable_key", "1")
.mode("append")
.save()
// 使用显式 ORDER BY(必需)自动创建表
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id")
.mode("append")
.save();
// 使用显式建表选项和自定义 engine
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "new_table")
.option("order_by", "id, timestamp")
.option("engine", "ReplacingMergeTree()")
.option("settings.allow_nullable_key", "1")
.mode("append")
.save();
信息
必须指定 ORDER BY:通过 TableProvider API 创建新表时,order_by 选项是必需的。你必须显式指定用于 ORDER BY 子句的列。连接器会校验所有指定列是否存在于 schema 中,如果有任意列缺失,则会抛出错误。
Engine 选择:默认的 engine 是 MergeTree(),但你可以通过 engine 选项指定任意 ClickHouse 表引擎(例如:ReplacingMergeTree()、SummingMergeTree()、AggregatingMergeTree() 等)。
TableProvider 连接选项
在使用基于格式的 API 时,可配置以下连接选项:
连接选项
| 选项 | 描述 | 默认值 | 是否必填 |
|---|
host | ClickHouse 服务器主机名 | localhost | 是 |
protocol | 连接协议(http 或 https) | http | 否 |
http_port | HTTP/HTTPS 端口 | 8123 | 否 |
database | 数据库名称 | default | 是 |
table | 表名 | N/A | 是 |
user | 用于认证的用户名 | default | 否 |
password | 用于认证的密码 | (empty string) | 否 |
ssl | 是否启用 SSL 连接 | false | 否 |
ssl_mode | SSL 模式(NONE、STRICT 等) | STRICT | 否 |
timezone | 用于日期/时间操作的时区 | server | 否 |
表创建选项
当目标表不存在且需要新建时,可使用以下选项:
| Option | Description | Default Value | Required |
|---|
order_by | 用于 ORDER BY 子句的列。多个列使用逗号分隔 | N/A | Yes |
engine | ClickHouse 表引擎(例如 MergeTree(), ReplacingMergeTree(), SummingMergeTree() 等) | MergeTree() | No |
settings.allow_nullable_key | 在 ORDER BY 中启用 Nullable 键(适用于 ClickHouse Cloud) | Auto-detected** | No |
settings.<key> | 任意 ClickHouse 表级 SETTING | N/A | No |
cluster | 分布式表的集群名称 | N/A | No |
clickhouse.column.<name>.variant_types | Variant 列对应的 ClickHouse 类型列表,使用逗号分隔(例如 String, Int64, Bool, JSON)。类型名区分大小写。逗号后的空格可选。 | N/A | No |
* 在创建新表时,order_by 选项是必需的。所有指定的列必须存在于模式(schema)中。
** 如果 ORDER BY 中包含 Nullable 列且未显式提供该设置,则会自动设置为 1。
提示
最佳实践:对于 ClickHouse Cloud,如果你的 ORDER BY 列可能为 Nullable 类型,请显式设置 settings.allow_nullable_key=1,因为 ClickHouse Cloud 需要此 SETTING。
写入模式
Spark 连接器(包括 TableProvider API 和 Catalog API)支持以下写入模式:
append:向已有表追加数据
overwrite:替换表中的全部数据(截断表)
信息
不支持分区覆盖写入:该连接器目前不支持分区级覆盖写入操作(例如在 overwrite 模式下使用 partitionBy)。该功能正在开发中。进度跟踪请参阅 GitHub issue #34。
# overwrite 模式(会先截断表)
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "my_table") \
.mode("overwrite") \
.save()
// overwrite 模式(会先截断表)
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.mode("overwrite")
.save()
// overwrite 模式(会先截断表)
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.mode("overwrite")
.save();
配置 ClickHouse 选项
Catalog API 和 TableProvider API 都支持配置 ClickHouse 专用选项(而非连接器选项)。这些选项在创建表或执行查询时会被原样传递给 ClickHouse。
ClickHouse 选项允许你配置 ClickHouse 专用的 SETTING,例如 allow_nullable_key、index_granularity,以及其他表级或查询级的 SETTING。它们不同于连接器选项(例如 host、database、table),连接器选项用于控制连接器如何连接到 ClickHouse。
使用 TableProvider API
使用 TableProvider API 时,请采用 settings.<key> 选项格式:
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "my_table") \
.option("order_by", "id") \
.option("settings.allow_nullable_key", "1") \
.option("settings.index_granularity", "8192") \
.mode("append") \
.save()
df.write
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.option("order_by", "id")
.option("settings.allow_nullable_key", "1")
.option("settings.index_granularity", "8192")
.mode("append")
.save()
df.write()
.format("clickhouse")
.option("host", "your-host")
.option("database", "default")
.option("table", "my_table")
.option("order_by", "id")
.option("settings.allow_nullable_key", "1")
.option("settings.index_granularity", "8192")
.mode("append")
.save();
使用 Catalog API
使用 Catalog API 时,在 Spark 配置中使用 spark.sql.catalog.<catalog_name>.option.<key> 格式:
spark.sql.catalog.clickhouse.option.allow_nullable_key 1
spark.sql.catalog.clickhouse.option.index_granularity 8192
或者在通过 Spark SQL 创建表时设置:
CREATE TABLE clickhouse.default.my_table (
id INT,
name STRING
) USING ClickHouse
TBLPROPERTIES (
engine = 'MergeTree()',
order_by = 'id',
'settings.allow_nullable_key' = '1',
'settings.index_granularity' = '8192'
)
ClickHouse Cloud 设置
连接到 ClickHouse Cloud 时,请确保启用 SSL,并设置合适的 SSL 模式。例如:
spark.sql.catalog.clickhouse.option.ssl true
spark.sql.catalog.clickhouse.option.ssl_mode NONE
读取数据
public static void main(String[] args) {
// 创建 Spark 会话
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();
Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");
df.show();
spark.stop();
}
object NativeSparkRead extends App {
val spark = SparkSession.builder
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate
val df = spark.sql("select * from clickhouse.default.example_table")
df.show()
spark.stop()
}
from pyspark.sql import SparkSession
packages = [
"com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0",
"com.clickhouse:clickhouse-client:0.7.0",
"com.clickhouse:clickhouse-http-client:0.7.0",
"org.apache.httpcomponents.client5:httpclient5:5.2.1"
]
spark = (SparkSession.builder
.config("spark.jars.packages", ",".join(packages))
.getOrCreate())
spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1")
spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http")
spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123")
spark.conf.set("spark.sql.catalog.clickhouse.user", "default")
spark.conf.set("spark.sql.catalog.clickhouse.password", "123456")
spark.conf.set("spark.sql.catalog.clickhouse.database", "default")
spark.conf.set("spark.clickhouse.write.format", "json")
df = spark.sql("select * from clickhouse.default.example_table")
df.show()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
SELECT * FROM jdbcTable;
写入数据
信息
不支持分区覆盖写入:Catalog API 当前不支持分区级别的覆盖写入操作(例如在 overwrite 模式下配合 partitionBy 使用)。此功能正在开发中。有关该功能的进展,请参阅 GitHub issue #34。
public static void main(String[] args) throws AnalysisException {
// Create a Spark session
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();
// Define the schema for the DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
});
List<Row> data = Arrays.asList(
RowFactory.create(1, "Alice"),
RowFactory.create(2, "Bob")
);
// Create a DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);
df.writeTo("clickhouse.default.example_table").append();
spark.stop();
}
object NativeSparkWrite extends App {
// Create a Spark session
val spark: SparkSession = SparkSession.builder
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate
// Define the schema for the DataFrame
val rows = Seq(Row(1, "John"), Row(2, "Doe"))
val schema = List(
StructField("id", DataTypes.IntegerType, nullable = false),
StructField("name", StringType, nullable = true)
)
// Create the df
val df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
df.writeTo("clickhouse.default.example_table").append()
spark.stop()
}
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Feel free to use any other packages combination satesfying the compatibility matrix provided above.
packages = [
"com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0",
"com.clickhouse:clickhouse-client:0.7.0",
"com.clickhouse:clickhouse-http-client:0.7.0",
"org.apache.httpcomponents.client5:httpclient5:5.2.1"
]
spark = (SparkSession.builder
.config("spark.jars.packages", ",".join(packages))
.getOrCreate())
spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1")
spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http")
spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123")
spark.conf.set("spark.sql.catalog.clickhouse.user", "default")
spark.conf.set("spark.sql.catalog.clickhouse.password", "123456")
spark.conf.set("spark.sql.catalog.clickhouse.database", "default")
spark.conf.set("spark.clickhouse.write.format", "json")
# Create DataFrame
data = [Row(id=11, name="John"), Row(id=12, name="Doe")]
df = spark.createDataFrame(data)
# Write DataFrame to ClickHouse
df.writeTo("clickhouse.default.example_table").append()
-- resultTable is the Spark intermediate df we want to insert into clickhouse.default.example_table
INSERT INTO TABLE clickhouse.default.example_table
SELECT * FROM resultTable;
DDL 操作
可以使用 Spark SQL 在 ClickHouse 实例上执行 DDL 操作,所有更改都会立即持久化到 ClickHouse。
Spark SQL 允许像在 ClickHouse 中一样编写查询,
因此可以直接执行诸如 CREATE TABLE、TRUNCATE 等命令——无需任何修改,例如:
注意
使用 Spark SQL 时,一次只能执行一条语句。
CREATE TABLE test_db.tbl_sql (
create_time TIMESTAMP NOT NULL,
m INT NOT NULL COMMENT 'part key',
id BIGINT NOT NULL COMMENT 'sort key',
value STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
engine = 'MergeTree()',
order_by = 'id',
settings.index_granularity = 8192
);
以上示例展示了 Spark SQL 查询,你可以在应用程序中使用任意 API(Java、Scala、PySpark 或 shell)来运行这些查询。
使用 VariantType
注意
Spark 从 4.0+ 版本开始支持 VariantType,并且需要在启用了 experimental JSON/Variant 类型的 ClickHouse 25.3+ 上使用。
该连接器支持 Spark 的 VariantType 用于处理半结构化数据。VariantType 映射到 ClickHouse 的 JSON 和 Variant 类型,从而可以高效地存储和查询模式灵活的数据。
注意
本节专门介绍 VariantType 的映射和用法。有关所有受支持数据类型的全面概览,请参阅受支持的数据类型一节。
ClickHouse 类型映射
| ClickHouse 类型 | Spark 类型 | 描述 |
|---|
JSON | VariantType | 仅存储 JSON 对象(必须以 { 开头) |
Variant(T1, T2, ...) | VariantType | 可存储多种类型,包括基本类型、数组和 JSON |
读取 VariantType 数据
从 ClickHouse 读取数据时,JSON 和 Variant 列会自动映射到 Spark 的 VariantType:
// 将 JSON 列读取为 VariantType
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// 访问 VariantType 数据
df.show()
// 将 VariantType 转换为 JSON 字符串以便查看
import org.apache.spark.sql.functions._
df.select(
col("id"),
to_json(col("data")).as("data_json")
).show()
# 将 JSON 列读取为 VariantType
df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
# 访问 VariantType 数据
df.show()
# 将 VariantType 转换为 JSON 字符串以便查看
from pyspark.sql.functions import to_json
df.select(
"id",
to_json("data").alias("data_json")
).show()
// 将 JSON 列读取为 VariantType
Dataset<Row> df = spark.sql("SELECT id, data FROM clickhouse.default.json_table");
// 访问 VariantType 数据
df.show();
// 将 VariantType 转换为 JSON 字符串以便查看
import static org.apache.spark.sql.functions.*;
df.select(
col("id"),
to_json(col("data")).as("data_json")
).show();
写入 VariantType 数据
你可以使用 JSON 或 Variant 列类型将 VariantType 数据写入 ClickHouse:
import org.apache.spark.sql.functions._
// 使用 JSON 数据创建 DataFrame
val jsonData = Seq(
(1, """{"name": "Alice", "age": 30}"""),
(2, """{"name": "Bob", "age": 25}"""),
(3, """{"name": "Charlie", "city": "NYC"}""")
).toDF("id", "json_string")
// 将 JSON 字符串解析为 VariantType
val variantDF = jsonData.select(
col("id"),
parse_json(col("json_string")).as("data")
)
// 使用 JSON 类型写入 ClickHouse(仅支持 JSON 对象)
variantDF.writeTo("clickhouse.default.user_data").create()
// 或者指定支持多种类型的 Variant 列
spark.sql("""
CREATE TABLE clickhouse.default.mixed_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
""")
from pyspark.sql.functions import parse_json
# 使用 JSON 数据创建 DataFrame
json_data = [
(1, '{"name": "Alice", "age": 30}'),
(2, '{"name": "Bob", "age": 25}'),
(3, '{"name": "Charlie", "city": "NYC"}')
]
df = spark.createDataFrame(json_data, ["id", "json_string"])
# 将 JSON 字符串解析为 VariantType
variant_df = df.select(
"id",
parse_json("json_string").alias("data")
)
# 使用 JSON 类型写入 ClickHouse
variant_df.writeTo("clickhouse.default.user_data").create()
// 或者指定支持多种类型的 Variant 列
spark.sql("""
CREATE TABLE clickhouse.default.mixed_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
""")
import static org.apache.spark.sql.functions.*;
// 使用 JSON 数据创建 DataFrame
List<Row> jsonData = Arrays.asList(
RowFactory.create(1, "{\"name\": \"Alice\", \"age\": 30}"),
RowFactory.create(2, "{\"name\": \"Bob\", \"age\": 25}"),
RowFactory.create(3, "{\"name\": \"Charlie\", \"city\": \"NYC\"}")
);
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("json_string", DataTypes.StringType, false)
});
Dataset<Row> jsonDF = spark.createDataFrame(jsonData, schema);
// 将 JSON 字符串解析为 VariantType
Dataset<Row> variantDF = jsonDF.select(
col("id"),
parse_json(col("json_string")).as("data")
);
// 使用 JSON 类型写入 ClickHouse(仅支持 JSON 对象)
variantDF.writeTo("clickhouse.default.user_data").create();
// 或者指定支持多种类型的 Variant 列
spark.sql("CREATE TABLE clickhouse.default.mixed_data (" +
"id INT, " +
"data VARIANT" +
") USING clickhouse " +
"TBLPROPERTIES (" +
"'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON', " +
"'engine' = 'MergeTree()', " +
"'order_by' = 'id'" +
")");
使用 Spark SQL 创建 VariantType 表
你可以使用 Spark SQL 的 DDL 语句创建 VariantType 表:
-- Create table with JSON type (default)
CREATE TABLE clickhouse.default.json_table (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'engine' = 'MergeTree()',
'order_by' = 'id'
)
-- Create table with Variant type supporting multiple types
CREATE TABLE clickhouse.default.flexible_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
配置 VariantType 类型
在创建包含 VariantType 列的表时,可以指定要使用的 ClickHouse 类型:
JSON 类型(默认)
如果未指定 variant_types 属性,列将默认使用 ClickHouse 的 JSON 类型,该类型只接受 JSON 对象:
CREATE TABLE clickhouse.default.json_table (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'engine' = 'MergeTree()',
'order_by' = 'id'
)
这会生成如下 ClickHouse 查询:
CREATE TABLE json_table (id Int32, data JSON) ENGINE = MergeTree() ORDER BY id
具有多种类型的 Variant 类型
要支持原始类型、数组和 JSON 对象,请在 variant_types 属性中指定这些类型:
CREATE TABLE clickhouse.default.flexible_data (
id INT,
data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
'engine' = 'MergeTree()',
'order_by' = 'id'
)
这会生成如下 ClickHouse 查询:
CREATE TABLE flexible_data (
id Int32,
data Variant(String, Int64, Float64, Bool, Array(String), JSON)
) ENGINE = MergeTree() ORDER BY id
支持的 Variant 类型
Variant() 中可以使用以下 ClickHouse 类型:
- 基本类型:
String、Int8、Int16、Int32、Int64、UInt8、UInt16、UInt32、UInt64、Float32、Float64、Bool
- 数组:
Array(T),其中 T 为任意受支持的类型,包括嵌套数组
- JSON:
JSON,用于存储 JSON 对象
默认情况下,JSON 和 Variant 列会被读取为 VariantType。可以通过以下配置将其改为按字符串读取:
// 将 JSON/Variant 读取为字符串,而不是 VariantType
spark.conf.set("spark.clickhouse.read.jsonAs", "string")
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// data 列将是包含 JSON 字符串的 StringType
# 将 JSON/Variant 读取为字符串,而不是 VariantType
spark.conf.set("spark.clickhouse.read.jsonAs", "string")
df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
# data 列将是包含 JSON 字符串的 StringType
// 将 JSON/Variant 读取为字符串,而不是 VariantType
spark.conf().set("spark.clickhouse.read.jsonAs", "string");
Dataset<Row> df = spark.sql("SELECT id, data FROM clickhouse.default.json_table");
// data 列将是包含 JSON 字符串的 StringType
VariantType 的写入支持在不同格式之间有所差异:
配置写入格式:
spark.conf.set("spark.clickhouse.write.format", "json") // Recommended for Variant types
提示
如需向 ClickHouse 的 Variant 类型写入数据,请使用 JSON 格式。Arrow 格式仅支持写入 JSON 类型。
最佳实践
- 仅包含 JSON 数据时使用 JSON 类型:如果只存储 JSON 对象,请使用默认的 JSON 类型(不设置
variant_types 属性)
- 显式指定类型:使用
Variant() 时,显式列出计划存储的所有类型
- 启用实验功能:确保 ClickHouse 已启用
allow_experimental_json_type = 1
- 写入时使用 JSON 格式:建议对 VariantType 数据使用 JSON 格式,以获得更好的兼容性
- 考虑查询模式:JSON/Variant 类型支持 ClickHouse 的 JSON 路径查询,从而实现高效过滤
- 使用列提示优化性能:在 ClickHouse 中使用 JSON 字段时,添加列提示有助于提升查询性能。目前尚不支持通过 Spark 添加列提示。有关该功能的进展,请参阅 GitHub issue #497。
完整工作流示例
import org.apache.spark.sql.functions._
// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")
// Create table with Variant column
spark.sql("""
CREATE TABLE clickhouse.default.events (
event_id BIGINT,
event_time TIMESTAMP,
event_data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'event_time'
)
""")
// Prepare data with mixed types
val events = Seq(
(1L, "2024-01-01 10:00:00", """{"action": "login", "user_id": 123}"""),
(2L, "2024-01-01 10:05:00", """{"action": "purchase", "amount": 99.99}"""),
(3L, "2024-01-01 10:10:00", """{"action": "logout", "duration": 600}""")
).toDF("event_id", "event_time", "json_data")
// Convert to VariantType and write
val variantEvents = events.select(
col("event_id"),
to_timestamp(col("event_time")).as("event_time"),
parse_json(col("json_data")).as("event_data")
)
variantEvents.writeTo("clickhouse.default.events").append()
// Read and query
val result = spark.sql("""
SELECT event_id, event_time, event_data
FROM clickhouse.default.events
WHERE event_time >= '2024-01-01'
ORDER BY event_time
""")
result.show(false)
from pyspark.sql.functions import parse_json, to_timestamp
# Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")
# Create table with Variant column
spark.sql("""
CREATE TABLE clickhouse.default.events (
event_id BIGINT,
event_time TIMESTAMP,
event_data VARIANT
) USING clickhouse
TBLPROPERTIES (
'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
'engine' = 'MergeTree()',
'order_by' = 'event_time'
)
""")
# Prepare data with mixed types
events = [
(1, "2024-01-01 10:00:00", '{"action": "login", "user_id": 123}'),
(2, "2024-01-01 10:05:00", '{"action": "purchase", "amount": 99.99}'),
(3, "2024-01-01 10:10:00", '{"action": "logout", "duration": 600}')
]
df = spark.createDataFrame(events, ["event_id", "event_time", "json_data"])
# Convert to VariantType and write
variant_events = df.select(
"event_id",
to_timestamp("event_time").alias("event_time"),
parse_json("json_data").alias("event_data")
)
variant_events.writeTo("clickhouse.default.events").append()
# Read and query
result = spark.sql("""
SELECT event_id, event_time, event_data
FROM clickhouse.default.events
WHERE event_time >= '2024-01-01'
ORDER BY event_time
""")
result.show(truncate=False)
import static org.apache.spark.sql.functions.*;
// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1");
// Create table with Variant column
spark.sql("CREATE TABLE clickhouse.default.events (" +
"event_id BIGINT, " +
"event_time TIMESTAMP, " +
"event_data VARIANT" +
") USING clickhouse " +
"TBLPROPERTIES (" +
"'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON', " +
"'engine' = 'MergeTree()', " +
"'order_by' = 'event_time'" +
")");
// Prepare data with mixed types
List<Row> events = Arrays.asList(
RowFactory.create(1L, "2024-01-01 10:00:00", "{\"action\": \"login\", \"user_id\": 123}"),
RowFactory.create(2L, "2024-01-01 10:05:00", "{\"action\": \"purchase\", \"amount\": 99.99}"),
RowFactory.create(3L, "2024-01-01 10:10:00", "{\"action\": \"logout\", \"duration\": 600}")
);
StructType eventSchema = new StructType(new StructField[]{
DataTypes.createStructField("event_id", DataTypes.LongType, false),
DataTypes.createStructField("event_time", DataTypes.StringType, false),
DataTypes.createStructField("json_data", DataTypes.StringType, false)
});
Dataset<Row> eventsDF = spark.createDataFrame(events, eventSchema);
// Convert to VariantType and write
Dataset<Row> variantEvents = eventsDF.select(
col("event_id"),
to_timestamp(col("event_time")).as("event_time"),
parse_json(col("json_data")).as("event_data")
);
variantEvents.writeTo("clickhouse.default.events").append();
// Read and query
Dataset<Row> result = spark.sql("SELECT event_id, event_time, event_data " +
"FROM clickhouse.default.events " +
"WHERE event_time >= '2024-01-01' " +
"ORDER BY event_time");
result.show(false);
以下是该连接器中可调整的配置项。
注意
使用配置:这些是 Spark 级别的配置选项,适用于 Catalog API 和 TableProvider API。可以通过两种方式进行设置:
-
全局 Spark 配置(适用于所有操作):
spark.conf.set("spark.clickhouse.write.batchSize", "20000")
spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
-
按操作覆盖设置(仅适用于 TableProvider API——可覆盖全局设置):
df.write \
.format("clickhouse") \
.option("host", "your-host") \
.option("database", "default") \
.option("table", "my_table") \
.option("spark.clickhouse.write.batchSize", "20000") \
.option("spark.clickhouse.write.compression.codec", "lz4") \
.mode("append") \
.save()
或者在 spark-defaults.conf 中设置,或者在创建 Spark 会话时进行设置。
| 键名 | 默认值 | 说明 | 自版本起 |
|---|
| spark.clickhouse.ignoreUnsupportedTransform | true | ClickHouse 支持使用复杂表达式作为分片键或分区值,例如 cityHash64(col_1, col_2),而这些目前在 Spark 中尚不受支持。若为 true,则忽略这些不受支持的表达式并记录警告日志,否则快速失败并抛出异常。警告:当 spark.clickhouse.write.distributed.convertLocal=true 时,忽略不受支持的分片键可能会导致数据损坏。连接器会对此进行校验,并在默认情况下抛出错误。若要允许该行为,请显式设置 spark.clickhouse.write.distributed.convertLocal.allowUnsupportedSharding=true。 | 0.4.0 |
| spark.clickhouse.read.compression.codec | lz4 | 用于在读取时对数据进行解压缩的编解码器。支持的编解码器:none、lz4。 | 0.5.0 |
| spark.clickhouse.read.distributed.convertLocal | true | 读取分布式表时,改为读取其对应的本地表而非分布式表本身。若为 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes。 | 0.1.0 |
| spark.clickhouse.read.fixedStringAs | binary | 将 ClickHouse 的 FixedString 类型读取为指定的 Spark 数据类型。支持的类型:binary、string | 0.8.0 |
| spark.clickhouse.read.format | json | 读取时使用的序列化格式。支持的格式:json、binary | 0.6.0 |
| spark.clickhouse.read.runtimeFilter.enabled | false | 启用读取时的运行时过滤器。 | 0.8.0 |
| spark.clickhouse.read.splitByPartitionId | true | 如果为 true,则通过虚拟列 _partition_id 而不是分区值构造输入分区过滤器。在基于分区值构造 SQL 谓词时存在已知问题。此功能需要 ClickHouse Server v21.6 或更高版本。 | 0.4.0 |
| spark.clickhouse.useNullableQuerySchema | false | 如果为 true,在通过执行 CREATE/REPLACE TABLE ... AS SELECT ... 创建或替换表时,将查询的 schema 中所有字段标记为 Nullable。注意,此配置依赖于 SPARK-43390(在 Spark 3.5 中可用);在未应用该补丁的情况下,其行为始终等同于 true。 | 0.8.0 |
| spark.clickhouse.write.batchSize | 10000 | 写入 ClickHouse 时每个批次包含的记录数。 | 0.1.0 |
| spark.clickhouse.write.compression.codec | lz4 | 用于在写入数据时进行压缩的编解码器。支持的编解码器:none、lz4。 | 0.3.0 |
| spark.clickhouse.write.distributed.convertLocal | false | 写入分布式表时,将数据写入其本地表而不是分布式表本身。若为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes。这会绕过 ClickHouse 的原生路由,需要由 Spark 负责计算分片键。当使用不受支持的分片表达式时,将 spark.clickhouse.ignoreUnsupportedTransform 设为 false,以防止静默的数据分布错误。 | 0.1.0 |
| spark.clickhouse.write.distributed.convertLocal.allowUnsupportedSharding | false | 当分片键不受支持时,在 convertLocal=true 且 ignoreUnsupportedTransform=true 的情况下允许写入分布式表。此选项存在风险,可能因错误分片导致数据损坏。将其设置为 true 时,必须在写入前确保数据已正确排序/分片,因为 Spark 无法计算不受支持的分片表达式。仅在充分理解风险并已验证数据分布的前提下才可将其设置为 true。默认情况下,该组合会抛出错误,以防止静默的数据损坏。 | 0.10.0 |
| spark.clickhouse.write.distributed.useClusterNodes | true | 在向分布式表写入数据时,向集群中所有节点写入数据。 | 0.1.0 |
| spark.clickhouse.write.format | arrow | 用于写入的序列化格式。支持的格式:json、arrow | 0.4.0 |
| spark.clickhouse.write.localSortByKey | true | 如果为 true,则在写入前按排序键在本地排序。 | 0.3.0 |
| spark.clickhouse.write.localSortByPartition | spark.clickhouse.write.repartitionByPartition 的值 | 如果为 true,则在写入前按分区进行本地排序。若未显式设置,则与 spark.clickhouse.write.repartitionByPartition 相同。 | 0.3.0 |
| spark.clickhouse.write.maxRetry | 3 | 单个批量写入因可重试错误码失败时的最大重试次数。 | 0.1.0 |
| spark.clickhouse.write.repartitionByPartition | true | 是否在写入前按 ClickHouse 分区键重新分区数据,以满足 ClickHouse 表的数据分布要求。 | 0.3.0 |
| spark.clickhouse.write.repartitionNum | 0 | 在写入前需要对数据重新分区以满足 ClickHouse 表的分布要求,使用此配置指定重新分区的分区数,取值小于 1 表示不作要求。 | 0.1.0 |
| spark.clickhouse.write.repartitionStrictly | false | 如果为 true,Spark 会在写入前严格地将传入记录按要求的分布策略分配到各个分区,然后再将记录传递给数据源表。否则,Spark 可能会应用某些优化来加速查询,但会破坏分布要求。注意,此配置依赖 SPARK-37523(在 Spark 3.4 中提供),若未包含该补丁,其行为始终等同于 true。 | 0.3.0 |
| spark.clickhouse.write.retryInterval | 10s | 两次写入重试之间的时间间隔(秒)。 | 0.1.0 |
| spark.clickhouse.write.retryableErrorCodes | 241 | 写入失败时 ClickHouse 服务器返回的可重试错误码。 | 0.1.0 |
支持的数据类型
本节说明 Spark 与 ClickHouse 之间的数据类型映射关系。下表提供了在从 ClickHouse 读取数据到 Spark,以及从 Spark 向 ClickHouse 写入数据时进行数据类型转换的快速参考。
将 ClickHouse 中的数据读取到 Spark
| ClickHouse Data Type | Spark 数据类型 | 是否支持 | 是否为基本类型 | 说明 |
|---|
Nothing | NullType | ✅ | 是 | |
Bool | BooleanType | ✅ | 是 | |
UInt8, Int16 | ShortType | ✅ | 是 | |
Int8 | ByteType | ✅ | 是 | |
UInt16,Int32 | IntegerType | ✅ | 是 | |
UInt32,Int64, UInt64 | LongType | ✅ | 是 | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | 是 | |
Float32 | FloatType | ✅ | 是 | |
Float64 | DoubleType | ✅ | 是 | |
String, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | 是 | |
FixedString | BinaryType, StringType | ✅ | 是 | 由配置 READ_FIXED_STRING_AS 控制 |
Decimal | DecimalType | ✅ | 是 | 精度和小数位最高到 Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | 是 | |
Decimal64 | DecimalType(18, scale) | ✅ | 是 | |
Decimal128 | DecimalType(38, scale) | ✅ | 是 | |
Date, Date32 | DateType | ✅ | 是 | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | 是 | |
Array | ArrayType | ✅ | 否 | 数组元素类型也会被转换 |
Map | MapType | ✅ | 否 | 键类型仅限于 StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | 是 | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | 是 | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | 否 | 使用对应的具体区间类型 |
JSON, Variant | VariantType | ✅ | 否 | 需要 Spark 4.0+ 和 ClickHouse 25.3+。可通过 spark.clickhouse.read.jsonAs=string 读取为 StringType |
Object | | ❌ | | |
Nested | | ❌ | | |
Tuple | StructType | ✅ | 否 | 同时支持具名和不具名 tuple。具名 tuple 按名称映射到 struct 字段,不具名 tuple 使用 _1、_2 等。支持嵌套 struct 和 Nullable 字段 |
Point | | ❌ | | |
Polygon | | ❌ | | |
MultiPolygon | | ❌ | | |
Ring | | ❌ | | |
IntervalQuarter | | ❌ | | |
IntervalWeek | | ❌ | | |
Decimal256 | | ❌ | | |
AggregateFunction | | ❌ | | |
SimpleAggregateFunction | | ❌ | | |
从 Spark 向 ClickHouse 插入数据
| Spark 数据类型 | ClickHouse 数据类型 | 是否支持 | 是否为原始类型 | 说明 |
|---|
BooleanType | Bool | ✅ | 是 | 自 0.9.0 版本起映射为 Bool 类型(而非 UInt8) |
ByteType | Int8 | ✅ | 是 | |
ShortType | Int16 | ✅ | 是 | |
IntegerType | Int32 | ✅ | 是 | |
LongType | Int64 | ✅ | 是 | |
FloatType | Float32 | ✅ | 是 | |
DoubleType | Float64 | ✅ | 是 | |
StringType | String | ✅ | 是 | |
VarcharType | String | ✅ | 是 | |
CharType | String | ✅ | 是 | |
DecimalType | Decimal(p, s) | ✅ | 是 | 精度和小数位数(scale)最高可达 Decimal128 |
DateType | Date | ✅ | 是 | |
TimestampType | DateTime | ✅ | 是 | |
ArrayType (list, tuple, or array) | Array | ✅ | 否 | 数组元素类型也会被转换 |
MapType | Map | ✅ | 否 | 键仅限于 StringType |
StructType | Tuple | ✅ | 否 | 转换为带字段名的命名 Tuple |
VariantType | JSON or Variant | ✅ | 否 | 需要 Spark 4.0+ 和 ClickHouse 25.3+。默认映射为 JSON 类型。使用 clickhouse.column.<name>.variant_types 属性可指定具有多种类型的 Variant。 |
Object | | ❌ | | |
Nested | | ❌ | | |
贡献与支持
如果您希望为本项目做出贡献或报告任何问题,我们非常欢迎您的参与!
请访问我们的 GitHub 仓库 来提交 issue、提出改进建议或发起 pull request。
我们欢迎所有贡献!在开始之前,请先阅读仓库中的贡献指南。
感谢您帮助改进我们的 ClickHouse Spark connector!