跳到主要内容
跳到主要内容

Spark 连接器

此连接器利用 ClickHouse 特定的优化,例如高级分区和谓词下推,以改善查询性能和数据处理。该连接器基于 ClickHouse 的官方 JDBC 连接器,并管理其自己的目录。

在 Spark 3.0 之前,Spark 缺乏内置的目录概念,因此用户通常依赖外部目录系统,例如 Hive Metastore 或 AWS Glue。使用这些外部解决方案时,用户在访问 Spark 中的数据源表之前,必须手动注册它们。然而,自从 Spark 3.0 引入目录概念以来,Spark 现在可以通过注册目录插件自动发现表。

Spark 的默认目录是 spark_catalog,表通过 {catalog name}.{database}.{table} 来识别。使用新的目录功能,现在可以在单个 Spark 应用程序中添加和使用多个目录。

需求

  • Java 8 或 17
  • Scala 2.12 或 2.13
  • Apache Spark 3.3, 3.4 或 3.5

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3不依赖于
0.3.0Spark 3.2, 3.3不依赖于
0.2.1Spark 3.2不依赖于
0.1.2Spark 3.2不依赖于

安装与设置

要将 ClickHouse 与 Spark 集成,有多种安装选项以适应不同的项目设置。您可以将 ClickHouse Spark 连接器作为依赖项直接添加到项目的构建文件中(例如在 Maven 的 pom.xml 或 SBT 的 build.sbt 中)。另外,您可以将所需的 JAR 文件放在 $SPARK_HOME/jars/ 文件夹中,或在 spark-submit 命令中使用 --jars 选项直接传递它们。这两种方法都确保 ClickHouse 连接器在您的 Spark 环境中可用。

作为依赖项导入

如果您想使用 SNAPSHOT 版本,请添加以下存储库。

下载库

二进制 JAR 的名称模式为:

您可以在 Maven Central Repository 中找到所有可用的发布 JAR 文件,以及在 Sonatype OSS Snapshots Repository 中找到所有每日构建的 SNAPSHOT JAR 文件。

:::重要 必须包含带有 "all" 分类器的 clickhouse-jdbc JAR,因为连接器依赖于 clickhouse-httpclickhouse-client — 这两者都捆绑在 clickhouse-jdbc:all 中。或者,如果您不想使用完整的 JDBC 包,可以单独添加 clickhouse-client JARclickhouse-http

无论如何,请确保包版本根据 兼容性矩阵 兼容。 :::

注册目录(必需)

为了访问您的 ClickHouse 表,您必须使用以下配置配置新的 Spark 目录:

属性默认值必需
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/A
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhost
spark.sql.catalog.<catalog_name>.protocolhttphttp
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123
spark.sql.catalog.<catalog_name>.user<clickhouse_username>default
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空字符串)
spark.sql.catalog.<catalog_name>.database<database>default
spark.<catalog_name>.write.formatjsonarrow

这些设置可以通过以下方式之一进行设置:

  • 编辑/创建 spark-defaults.conf
  • 将配置传递给您的 spark-submit 命令(或传递给您的 spark-shell/spark-sql CLI 命令)。
  • 在初始化上下文时添加配置。

:::重要 在处理 ClickHouse 集群时,您需要为每个实例设置唯一的目录名称。例如:

这样,您将能够通过 clickhouse1.<ck_db>.<ck_table> 从 Spark SQL 访问 clickhouse1 表 <ck_db>.<ck_table>,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 表 <ck_db>.<ck_table>

:::

ClickHouse Cloud 设置

连接到 ClickHouse Cloud 时,请确保启用 SSL 并设置适当的 SSL 模式。例如:

读取数据

写入数据

DDL 操作

您可以使用 Spark SQL 对 ClickHouse 实例执行 DDL 操作,所有更改立即在 ClickHouse 中持久化。Spark SQL 允许您准确地按照您在 ClickHouse 中的写法编写查询,因此您可以直接执行诸如 CREATE TABLE、TRUNCATE 等命令,而无需修改,例如:

上述示例演示了 Spark SQL 查询,您可以在应用程序中使用任何 API——Java、Scala、PySpark 或 shell 运行这些查询。

配置

以下是连接器中可调节的配置:


默认值描述自版本
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse 支持使用复杂表达式作为分片键或分区值,例如 cityHash64(col_1, col_2),而这些在 Spark 中当前不受支持。如果 true,则忽略不受支持的表达式,否则快速失败并抛出异常。注意,当启用 spark.clickhouse.write.distributed.convertLocal 时,忽略不受支持的分片键可能会损坏数据。0.4.0
spark.clickhouse.read.compression.codeclz4用于读取数据时解压缩的编解码器。支持的编解码器:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue在读取分布式表时,读取本地表而不是自身。如果 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes0.1.0
spark.clickhouse.read.fixedStringAsbinary将 ClickHouse FixedString 类型读取为指定的 Spark 数据类型。支持的类型:binary、string。0.8.0
spark.clickhouse.read.formatjson读取的序列化格式。支持的格式:json、binary。0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse启用读取的运行时过滤器。0.8.0
spark.clickhouse.read.splitByPartitionIdtrue如果为 true,通过虚拟列 _partition_id 构建输入分区过滤器,而不是分区值。已知在通过分区值组装 SQL 谓词时存在问题。此特性需要 ClickHouse Server v21.6+。0.4.0
spark.clickhouse.useNullableQuerySchemafalse如果为 true,在执行 CREATE/REPLACE TABLE ... AS SELECT ... 时将查询架构的所有字段标记为可空。注意,此配置需要 SPARK-43390(可在 Spark 3.5 中使用),没有此补丁,会始终表现为 true0.8.0
spark.clickhouse.write.batchSize10000写入 ClickHouse 时每批的记录数量。0.1.0
spark.clickhouse.write.compression.codeclz4写入数据时使用的压缩编解码器。支持的编解码器:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse写入分布式表时,写入本地表而不是自身。如果为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue写入分布式表时写入集群的所有节点。0.1.0
spark.clickhouse.write.formatarrow写入的序列化格式。支持的格式:json、arrow。0.4.0
spark.clickhouse.write.localSortByKeytrue如果为 true,在写入之前按排序键进行本地排序。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition 的值如果为 true,在写入之前按分区进行本地排序。如果未设置,等于 spark.clickhouse.write.repartitionByPartition0.3.0
spark.clickhouse.write.maxRetry3单批写入因可重试代码失败而重试的最大次数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue在写入之前是否按 ClickHouse 分区键重新分区数据,以满足 ClickHouse 表的分布。0.3.0
spark.clickhouse.write.repartitionNum0在写入之前需要重新分区以满足 ClickHouse 表的分布,使用此配置来指定重新分区的数量,值小于 1 意味着不需要。0.1.0
spark.clickhouse.write.repartitionStrictlyfalse如果为 true,Spark 将严格按分区分配传入记录,以满足在写入时传递给数据源表的分布要求。否则,Spark 可能会应用某些优化以加快查询速度,但会打破分配要求。注意,此配置需要 SPARK-37523(可在 Spark 3.4 中使用),没有此补丁,会始终表现为 true0.3.0
spark.clickhouse.write.retryInterval10s写入重试之间的时间间隔(以秒为单位)。0.1.0
spark.clickhouse.write.retryableErrorCodes241写入失败时 ClickHouse 服务器返回的可重试错误代码。0.1.0

支持的数据类型

本节概述了 Spark 和 ClickHouse 之间的数据类型映射。下面的表提供了从 ClickHouse 读取到 Spark 以及从 Spark 插入数据到 ClickHouse 时转换数据类型的快速参考。

从 ClickHouse 读取数据到 Spark

ClickHouse 数据类型Spark 数据类型支持是否原始注释
NothingNullType
BoolBooleanType
UInt8, Int16ShortType
Int8ByteType
UInt16,Int32IntegerType
UInt32,Int64, UInt64LongType
Int128,UInt128, Int256, UInt256DecimalType(38, 0)
Float32FloatType
Float64DoubleType
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringType
FixedStringBinaryType, StringType由配置 READ_FIXED_STRING_AS 控制
DecimalDecimalType精度和标度最多为 Decimal128
Decimal32DecimalType(9, scale)
Decimal64DecimalType(18, scale)
Decimal128DecimalType(38, scale)
Date, Date32DateType
DateTime, DateTime32, DateTime64TimestampType
ArrayArrayType数组元素类型也会被转换
MapMapType键仅限于 StringType
IntervalYearYearMonthIntervalType(Year)
IntervalMonthYearMonthIntervalType(Month)
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalType使用特定的间隔类型
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

从 Spark 插入数据到 ClickHouse

Spark 数据类型ClickHouse 数据类型支持是否原始注释
BooleanTypeUInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和标度最多为 Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType (列表、元组或数组)Array数组元素类型也会被转换
MapTypeMap键仅限于 StringType
Object
Nested

贡献与支持

如果您想为该项目贡献或报告任何问题,我们欢迎您的意见! 访问我们的 GitHub 仓库 来打开问题、建议改进或提交拉取请求。 欢迎贡献!在开始之前,请查看仓库中的贡献指南。 感谢您帮助改善我们的 ClickHouse Spark 连接器!