Spark 连接器
此连接器利用 ClickHouse 特有的优化功能,例如高级分区和谓词下推,以提升查询性能和数据处理效率。 该连接器基于 ClickHouse 官方 JDBC 连接器,并管理其自己的 catalog。
在 Spark 3.0 之前,Spark 缺少内置的 catalog 概念,因此用户通常依赖外部 catalog 系统,例如 Hive Metastore 或 AWS Glue。 使用这些外部方案时,用户必须先手动注册数据源表,才能在 Spark 中访问它们。 然而,自从 Spark 3.0 引入 catalog 概念之后,Spark 现在可以通过注册 catalog 插件来自动发现表。
Spark 的默认 catalog 是 spark_catalog,表通过 {catalog name}.{database}.{table} 标识。借助新的 catalog 功能,现在可以在单个 Spark 应用程序中添加并使用多个 catalog。
先决条件
- Java 8 或 17
- Scala 2.12 或 2.13
- Apache Spark 3.3 或 3.4 或 3.5
兼容性矩阵
| 版本 | 兼容的 Spark 版本 | ClickHouse JDBC 版本 |
|---|---|---|
| main | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 |
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
| 0.4.0 | Spark 3.2, 3.3 | 无依赖 |
| 0.3.0 | Spark 3.2, 3.3 | 无依赖 |
| 0.2.1 | Spark 3.2 | 无依赖 |
| 0.1.2 | Spark 3.2 | 无依赖 |
安装与设置
要将 ClickHouse 集成到 Spark 中,有多种安装方式可适配不同的项目环境。
您可以在项目的构建文件中(例如 Maven 的 pom.xml 或 SBT 的 build.sbt)直接添加 ClickHouse Spark 连接器作为依赖。
或者,也可以将所需的 JAR 文件放到 $SPARK_HOME/jars/ 目录中,或者在使用 spark-submit 命令时,通过 --jars 参数作为 Spark 选项直接传入。
这两种方式都可以确保在您的 Spark 环境中提供 ClickHouse 连接器。
作为依赖导入
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
如果您想使用 SNAPSHOT 版本,请添加以下仓库。
如果您想使用 SNAPSHOT 版本,请添加以下仓库:
在使用 Spark 的 Shell 选项(Spark SQL CLI、Spark Shell CLI 和 Spark Submit 命令)时,可以通过传入所需的 JAR 来注册依赖:
如果您希望避免将 JAR 文件拷贝到 Spark 客户端节点上,可以改用以下方式:
注意:对于仅 SQL 的使用场景,推荐在生产环境中使用 Apache Kyuubi。
下载依赖库
二进制 JAR 的命名规则为:
你可以在 Maven Central Repository 中找到所有已发布的 JAR 文件, 并在 Sonatype OSS Snapshots Repository 中找到所有每日构建的 SNAPSHOT JAR 文件。
务必包含带有 “all” classifier 的 clickhouse-jdbc JAR, 因为该 connector 依赖 clickhouse-http 和 clickhouse-client——这两者都已经打包 在 clickhouse-jdbc:all 中。 或者,如果不想使用完整的 JDBC 包,也可以分别添加 clickhouse-client JAR 和 clickhouse-http。
无论采用哪种方式,请确保这些包的版本与 兼容性矩阵 中列出的版本保持兼容。
注册 Catalog(必需)
要访问 ClickHouse 表,必须使用以下配置创建一个新的 Spark Catalog:
| Property | Value | Default Value | Required |
|---|---|---|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Yes |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | No |
spark.sql.catalog.<catalog_name>.protocol | http | http | No |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | No |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | No |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (empty string) | No |
spark.sql.catalog.<catalog_name>.database | <database> | default | No |
spark.<catalog_name>.write.format | json | arrow | No |
可以通过以下任一方式设置这些配置:
- 编辑或创建
spark-defaults.conf。 - 将配置传递给
spark-submit命令(或spark-shell/spark-sqlCLI 命令)。 - 在初始化上下文时添加配置。
在使用 ClickHouse 集群时,需要为每个实例设置唯一的 Catalog 名称。 例如:
通过这种方式,你可以在 Spark SQL 中通过 clickhouse1.<ck_db>.<ck_table> 访问 clickhouse1 实例中的 <ck_db>.<ck_table> 表,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 实例中的 <ck_db>.<ck_table> 表。
ClickHouse Cloud 设置
在连接到 ClickHouse Cloud 时,请确保启用 SSL,并设置合适的 SSL 模式。例如:
读取数据
- Java
- Scala
- Python
- Spark SQL
写入数据
- Java
- Scala
- Python
你也可以自由使用任何其他符合上述兼容性矩阵要求的包组合。
packages = [ "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0", "com.clickhouse:clickhouse-client:0.7.0", "com.clickhouse:clickhouse-http-client:0.7.0", "org.apache.httpcomponents.client5:httpclient5:5.2.1"
]
spark = (SparkSession.builder .config("spark.jars.packages", ",".join(packages)) .getOrCreate())
spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1") spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http") spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123") spark.conf.set("spark.sql.catalog.clickhouse.user", "default") spark.conf.set("spark.sql.catalog.clickhouse.password", "123456") spark.conf.set("spark.sql.catalog.clickhouse.database", "default") spark.conf.set("spark.clickhouse.write.format", "json")
创建 DataFrame
data = [Row(id=11, name="John"), Row(id=12, name="Doe")] df = spark.createDataFrame(data)
将 DataFrame 写入 ClickHouse
df.writeTo("clickhouse.default.example_table").append()
DDL 操作
你可以使用 Spark SQL 对 ClickHouse 实例执行 DDL 操作,所有更改都会立即持久化到 ClickHouse 中。 Spark SQL 允许你像在 ClickHouse 中一样编写查询, 因此你可以直接执行诸如 CREATE TABLE、TRUNCATE 等命令——无需修改,例如:
使用 Spark SQL 时,每次只能执行一条语句。
上述示例演示了 Spark SQL 查询,你可以在应用程序中使用任意 API(如 Java、Scala、PySpark 或 shell)来运行这些查询。
配置
以下是该连接器中可调整的配置项:
| 键 | 默认 | 描述 | 自从 |
|---|---|---|---|
| spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse 支持将复杂表达式用作分片键或分区值,例如 cityHash64(col_1, col_2),但这些目前在 Spark 中不受支持。若为 true,则忽略这些不受支持的表达式,否则将立即失败并抛出异常。注意,当启用 spark.clickhouse.write.distributed.convertLocal 时,忽略不受支持的分片键可能会导致数据损坏。 | 0.4.0 |
| spark.clickhouse.read.compression.codec | lz4 | 用于在读取时解压数据的编解码器。支持的编码格式:none、lz4。 | 0.5.0 |
| spark.clickhouse.read.distributed.convertLocal | true | 在读取 Distributed 表时,改为读取对应的本地表而不是 Distributed 表本身。若为 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes。 | 0.1.0 |
| spark.clickhouse.read.fixedStringAs | 二进制 | 按指定的 Spark 数据类型读取 ClickHouse 的 FixedString 类型。支持的类型:binary、string | 0.8.0 |
| spark.clickhouse.read.format | JSON | 读取时使用的序列化格式。支持的格式:JSON、Binary | 0.6.0 |
| spark.clickhouse.read.runtimeFilter.enabled | false | 为读取启用运行时过滤。 | 0.8.0 |
| spark.clickhouse.read.splitByPartitionId | true | 如果为 true,则通过虚拟列 _partition_id 而不是分区值来构造输入分区过滤条件。通过分区值组装 SQL 谓词已知存在问题。此功能需要 ClickHouse Server v21.6 及以上版本。 | 0.4.0 |
| spark.clickhouse.useNullableQuerySchema | false | 如果设置为 true,在创建表时执行 CREATE/REPLACE TABLE ... AS SELECT ... 会将查询 schema 中的所有字段都标记为可为空。注意,此配置依赖于 SPARK-43390(在 Spark 3.5 中可用),如果没有该补丁,其行为始终等同于 true。 | 0.8.0 |
| spark.clickhouse.write.batchSize | 10000 | 每批写入 ClickHouse 的记录数。 | 0.1.0 |
| spark.clickhouse.write.compression.codec | lz4 | 用于在写入数据时进行压缩的编解码器。支持的编解码器:none、lz4。 | 0.3.0 |
| spark.clickhouse.write.distributed.convertLocal | false | 在写入 Distributed 表时,改为写入本地表而不是 Distributed 表本身。若为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes。 | 0.1.0 |
| spark.clickhouse.write.distributed.useClusterNodes | true | 在写入 Distributed 表时,将数据写入集群中的所有节点。 | 0.1.0 |
| spark.clickhouse.write.format | 箭头 | 写入时使用的序列化格式。支持的格式:JSON、Arrow | 0.4.0 |
| spark.clickhouse.write.localSortByKey | true | 如果为 true,在写入前按排序键进行本地排序。 | 0.3.0 |
| spark.clickhouse.write.localSortByPartition | spark.clickhouse.write.repartitionByPartition 的值 | 如果为 true,则在写入前按分区进行本地排序。若未设置,则等同于 spark.clickhouse.write.repartitionByPartition。 | 0.3.0 |
| spark.clickhouse.write.maxRetry | 3 | 对因可重试错误码而失败的单个批量写入操作允许的最大重试次数。 | 0.1.0 |
| spark.clickhouse.write.repartitionByPartition | true | 在写入之前,是否根据 ClickHouse 分区键对数据重新分区,以匹配 ClickHouse 表的分区分布。 | 0.3.0 |
| spark.clickhouse.write.repartitionNum | 0 | 如果在写入前需要根据 ClickHouse 表的分布对数据进行重新分区,可使用此配置来指定重新分区的分区数;当该值小于 1 时表示对此无要求。 | 0.1.0 |
| spark.clickhouse.write.repartitionStrictly | false | 如果为 true,Spark 会在写入时将传入记录严格分布到各个分区,以满足所需的分布要求,然后再将记录传递给数据源表。否则,Spark 可能会应用某些优化以加速查询,但可能会破坏分布要求。注意,该配置依赖于补丁 SPARK-37523(在 Spark 3.4 中可用),在没有该补丁时,其行为始终等同于 true。 | 0.3.0 |
| spark.clickhouse.write.retryInterval | 10s | 两次写入重试之间的时间间隔(以秒为单位)。 | 0.1.0 |
| spark.clickhouse.write.retryableErrorCodes | 241 | 在写入失败时由 ClickHouse 服务器返回的可重试的错误码。 | 0.1.0 |
支持的数据类型
本节说明 Spark 与 ClickHouse 之间的数据类型映射。下表提供了快速参考,便于在从 ClickHouse 读取数据到 Spark,或从 Spark 向 ClickHouse 插入数据时进行数据类型转换。
从 ClickHouse 读取数据到 Spark
| ClickHouse 数据类型 | Spark 数据类型 | 是否支持 | 是否为原始类型 | 说明 |
|---|---|---|---|---|
Nothing | NullType | ✅ | Yes | |
Bool | BooleanType | ✅ | Yes | |
UInt8, Int16 | ShortType | ✅ | Yes | |
Int8 | ByteType | ✅ | Yes | |
UInt16,Int32 | IntegerType | ✅ | Yes | |
UInt32,Int64, UInt64 | LongType | ✅ | Yes | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | Yes | |
Float32 | FloatType | ✅ | Yes | |
Float64 | DoubleType | ✅ | Yes | |
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | Yes | |
FixedString | BinaryType, StringType | ✅ | Yes | 由配置项 READ_FIXED_STRING_AS 控制 |
Decimal | DecimalType | ✅ | Yes | 精度和小数位数最高支持到 Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | Yes | |
Decimal64 | DecimalType(18, scale) | ✅ | Yes | |
Decimal128 | DecimalType(38, scale) | ✅ | Yes | |
Date, Date32 | DateType | ✅ | Yes | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | Yes | |
Array | ArrayType | ✅ | No | 数组元素类型也会被转换 |
Map | MapType | ✅ | No | 键类型仅支持 StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | Yes | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | Yes | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | No | 会使用对应的具体区间类型 |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
从 Spark 向 ClickHouse 插入数据
| Spark 数据类型 | ClickHouse 数据类型 | 是否支持 | 是否为基础类型 | 说明 |
|---|---|---|---|---|
BooleanType | UInt8 | ✅ | 是 | |
ByteType | Int8 | ✅ | 是 | |
ShortType | Int16 | ✅ | 是 | |
IntegerType | Int32 | ✅ | 是 | |
LongType | Int64 | ✅ | 是 | |
FloatType | Float32 | ✅ | 是 | |
DoubleType | Float64 | ✅ | 是 | |
StringType | String | ✅ | 是 | |
VarcharType | String | ✅ | 是 | |
CharType | String | ✅ | 是 | |
DecimalType | Decimal(p, s) | ✅ | 是 | 精度和小数位数最高可达 Decimal128 |
DateType | Date | ✅ | 是 | |
TimestampType | DateTime | ✅ | 是 | |
ArrayType(list、tuple 或 array) | Array | ✅ | 否 | 数组元素类型也会被转换 |
MapType | Map | ✅ | 否 | 键类型仅限于 StringType |
Object | ❌ | |||
Nested | ❌ |
贡献与支持
如果您希望为该项目做出贡献或报告任何问题,我们非常欢迎您的参与! 请访问我们的 GitHub 仓库 来提交 issue、提出改进建议或发起 pull request。 我们欢迎所有贡献!在开始之前,请先查看仓库中的贡献指南。 感谢您帮助改进我们的 ClickHouse Spark 连接器!