跳到主要内容
跳到主要内容

Spark 连接器

此连接器利用 ClickHouse 特定的优化,例如高级分区和谓词下推,以提高查询性能和数据处理能力。该连接器基于 ClickHouse 的官方 JDBC 连接器,并管理其自己的目录。

在 Spark 3.0 之前,Spark 缺乏内建的目录概念,因此用户通常依赖于外部目录系统,例如 Hive Metastore 或 AWS Glue。使用这些外部解决方案时,用户必须在访问 Spark 中的数据源表之前手动注册它们。 然而,自 Spark 3.0 引入目录概念后,Spark 现在可以通过注册目录插件自动发现表。

Spark 的默认目录是 spark_catalog,表通过 {catalog name}.{database}.{table} 进行标识。借助新的目录功能,现在可以在单个 Spark 应用程序中添加和使用多个目录。

要求

  • Java 8 或 17
  • Scala 2.12 或 2.13
  • Apache Spark 3.3 或 3.4 或 3.5

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3不依赖于
0.3.0Spark 3.2, 3.3不依赖于
0.2.1Spark 3.2不依赖于
0.1.2Spark 3.2不依赖于

安装与设置

要将 ClickHouse 与 Spark 集成,有多种安装选项以适应不同的项目设置。您可以直接在项目的构建文件中将 ClickHouse Spark 连接器作为依赖项添加(例如,Maven 中的 pom.xml 或 SBT 中的 build.sbt)。或者,您可以将所需的 JAR 文件放入 $SPARK_HOME/jars/ 文件夹中,或者在 spark-submit 命令中直接使用 --jars 标志传递它们。这两种方法都确保 ClickHouse 连接器在您的 Spark 环境中可用。

作为依赖项导入

如果您想使用 SNAPSHOT 版本,请添加以下存储库。

下载库

二进制 JAR 的命名模式是:

您可以在 Maven Central Repository 找到所有可用的已发布 JAR 文件,以及在 Sonatype OSS Snapshots Repository 找到所有每日构建的 SNAPSHOT JAR 文件。

信息

务必包含具有 "all" 分类器的 clickhouse-jdbc JAR,因为连接器依赖于 clickhouse-httpclickhouse-client — 这两者都捆绑在 clickhouse-jdbc:all 中。或者,如果您不想使用完整的 JDBC 包,您可以单独添加 clickhouse-client JARclickhouse-http

无论如何,请确保根据 兼容性矩阵 确保包版本兼容。

注册目录(必需)

为了访问 ClickHouse 表,您必须使用以下配置配置新的 Spark 目录:

属性默认值必需
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/A
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhost
spark.sql.catalog.<catalog_name>.protocolhttphttp
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123
spark.sql.catalog.<catalog_name>.user<clickhouse_username>default
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空字符串)
spark.sql.catalog.<catalog_name>.database<database>default
spark.<catalog_name>.write.formatjsonarrow

这些设置可以通过以下方式进行设置:

  • 编辑/创建 spark-defaults.conf
  • 将配置传递给您的 spark-submit 命令(或传递给您的 spark-shell/spark-sql CLI 命令)。
  • 在初始化上下文时添加配置。
信息

在使用 ClickHouse 集群时,您需要为每个实例设置一个唯一的目录名称。 例如:

通过这种方式,您将能够通过 clickhouse1.<ck_db>.<ck_table> 从 Spark SQL 访问 clickhouse1 表 <ck_db>.<ck_table>,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 表 <ck_db>.<ck_table>

读取数据

写入数据

DDL 操作

您可以使用 Spark SQL 对 ClickHouse 实例执行 DDL 操作,所有更改立即持久化到 ClickHouse。 Spark SQL 允许您像在 ClickHouse 中一样编写查询,因此您可以直接执行 SQL 命令,例如 CREATE TABLE、TRUNCATE 等,无需修改。例如:

上述示例演示了 Spark SQL 查询,您可以在应用程序的任何 API 中运行这些查询——Java、Scala、PySpark 或 shell。

配置

以下是在连接器中可调节的配置:


默认描述自版本
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse支持使用复杂表达式作为分片键或分区值,例如cityHash64(col_1, col_2),而这在Spark中当前不支持。如果为true,则忽略不支持的表达式;否则快速失败并抛出异常。请注意,当spark.clickhouse.write.distributed.convertLocal启用时,忽略不支持的分片键可能会损坏数据。0.4.0
spark.clickhouse.read.compression.codeclz4用于读取数据时解压缩的编解码器。支持的编解码器:none, lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue在读取分布式表时,读取本地表,而不是自身。如果为true,则忽略spark.clickhouse.read.distributed.useClusterNodes0.1.0
spark.clickhouse.read.fixedStringAsbinary将ClickHouse FixedString类型读取为指定的Spark数据类型。支持的类型:binary, string0.8.0
spark.clickhouse.read.formatjson读取时的序列化格式。支持的格式:json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse启用读取时的运行时过滤器。0.8.0
spark.clickhouse.read.splitByPartitionIdtrue如果为true,通过虚拟列_partition_id构造输入分区过滤器,而不是分区值。已知存在通过分区值组装SQL谓词的问题。此功能要求ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalse如果为true,在执行CREATE/REPLACE TABLE ... AS SELECT ...时,将查询模式的所有字段标记为nullable以创建表。请注意,此配置要求SPARK-43390(在Spark 3.5中可用),如果没有此补丁,则始终作为true执行。0.8.0
spark.clickhouse.write.batchSize10000向ClickHouse写入时每批次的记录数。0.1.0
spark.clickhouse.write.compression.codeclz4用于写入数据时压缩的编解码器。支持的编解码器:none, lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse在写入分布式表时,写入本地表,而不是自身。如果为true,则忽略spark.clickhouse.write.distributed.useClusterNodes0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue写入分布式表时,写入集群中的所有节点。0.1.0
spark.clickhouse.write.formatarrow写入时的序列化格式。支持的格式:json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrue如果为true,在写入前按排序键进行本地排序。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition的值如果为true,在写入前按分区进行本地排序。如果未设置,则等于spark.clickhouse.write.repartitionByPartition0.3.0
spark.clickhouse.write.maxRetry3对于单个批次写入失败的最大重试次数,使用可重试的代码。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue是否按ClickHouse分区键重新分区数据,以满足写入前ClickHouse表的分布。0.3.0
spark.clickhouse.write.repartitionNum0在写入前必须重新分区数据以满足ClickHouse表的分布,使用此配置指定重新分区的数量,值小于1表示无要求。0.1.0
spark.clickhouse.write.repartitionStrictlyfalse如果为true,Spark将严格在分区之间分配传入记录,以满足所需的分布,然后再将记录传递给数据源表进行写入。否则,Spark可能会应用某些优化来加速查询,但破坏分布要求。请注意,此配置要求SPARK-37523(在Spark 3.4中可用),没有此补丁时始终作为true执行。0.3.0
spark.clickhouse.write.retryInterval10s写入重试之间的间隔,单位为秒。0.1.0
spark.clickhouse.write.retryableErrorCodes241写入失败时ClickHouse服务器返回的可重试错误代码。0.1.0

支持的数据类型

本节概述了Spark和ClickHouse之间的数据类型映射。以下表格提供了从ClickHouse读取到Spark以及从Spark插入到ClickHouse时转换数据类型的快速参考。

从ClickHouse读取数据到Spark

ClickHouse数据类型Spark数据类型支持是否为原始类型备注
NothingNullType
BoolBooleanType
UInt8, Int16ShortType
Int8ByteType
UInt16,Int32IntegerType
UInt32,Int64, UInt64LongType
Int128,UInt128, Int256, UInt256DecimalType(38, 0)
Float32FloatType
Float64DoubleType
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringType
FixedStringBinaryType, StringType由配置READ_FIXED_STRING_AS控制
DecimalDecimalType精度和刻度最高可达Decimal128
Decimal32DecimalType(9, scale)
Decimal64DecimalType(18, scale)
Decimal128DecimalType(38, scale)
Date, Date32DateType
DateTime, DateTime32, DateTime64TimestampType
ArrayArrayType数组元素类型也被转换
MapMapType键限于StringType
IntervalYearYearMonthIntervalType(Year)
IntervalMonthYearMonthIntervalType(Month)
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalType使用特定的时间间隔类型
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

从Spark插入数据到ClickHouse

Spark数据类型ClickHouse数据类型支持是否为原始类型备注
BooleanTypeUInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和刻度最高可达Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType (list, tuple, or array)Array数组元素类型也被转换
MapTypeMap键限于StringType
Object
Nested

贡献与支持

如果您希望为项目贡献或报告任何问题,欢迎您的反馈! 请访问我们的 GitHub 仓库 来打开问题,建议改进或提交补丁请求。 欢迎贡献!请在开始之前查看仓库中的贡献指南。 感谢您为改善我们的ClickHouse Spark连接器所做的贡献!