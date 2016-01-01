Spark Connector
This connector leverages ClickHouse-specific optimizations, such as advanced partitioning and predicate pushdown, to improve query performance and data handling. The connector is based on ClickHouse's official JDBC connector, and manages its own catalog.
Before Spark 3.0, Spark lacked a built-in catalog concept, so users typically relied on external catalog systems such as Hive Metastore or AWS Glue. With these external solutions, users had to register their data source tables manually before accessing them in Spark. However, since Spark 3.0 introduced the catalog concept, Spark can now automatically discover tables by registering catalog plugins.
Spark default catalog is
spark_catalog, and tables are identified by
{catalog name}.{database}.{table}. With the new
catalog feature, it is now possible to add and work with multiple catalogs in a single Spark application.
Requirements
- Java 8 or 17
- Scala 2.12 or 2.13
- Apache Spark 3.3 or 3.4 or 3.5
Compatibility Matrix
|Version
|Compatible Spark Versions
|ClickHouse JDBC version
|main
|Spark 3.3, 3.4, 3.5
|0.6.3
|0.8.1
|Spark 3.3, 3.4, 3.5
|0.6.3
|0.8.0
|Spark 3.3, 3.4, 3.5
|0.6.3
|0.7.3
|Spark 3.3, 3.4
|0.4.6
|0.6.0
|Spark 3.3
|0.3.2-patch11
|0.5.0
|Spark 3.2, 3.3
|0.3.2-patch11
|0.4.0
|Spark 3.2, 3.3
|Not depend on
|0.3.0
|Spark 3.2, 3.3
|Not depend on
|0.2.1
|Spark 3.2
|Not depend on
|0.1.2
|Spark 3.2
|Not depend on
Installation & Setup
For integrating ClickHouse with Spark, there are multiple installation options to suit different project setups.
You can add the ClickHouse Spark connector as a dependency directly in your project’s build file (such as in
pom.xml
for Maven or
build.sbt for SBT).
Alternatively, you can put the required JAR files in your
$SPARK_HOME/jars/ folder, or pass them directly as a Spark
option using the
--jars flag in the
spark-submit command.
Both approaches ensure the ClickHouse connector is available in your Spark environment.
Import as a Dependency
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
Add the following repository if you want to use SNAPSHOT version.
Add the following repository if you want to use the SNAPSHOT version:
When working with Spark's shell options (Spark SQL CLI, Spark Shell CLI, Spark Submit command), the dependencies can be registered by passing the required jars:
If you want to avoid copying the JAR files to your Spark client node, you can use the following instead:
Note: For SQL-only use cases, Apache Kyuubi is recommended for production.
Download The Library
The name pattern of the binary JAR is:
You can find all available released JAR files in the Maven Central Repository and all daily build SNAPSHOT JAR files in the Sonatype OSS Snapshots Repository.
It's essential to include the clickhouse-jdbc JAR with the "all" classifier, as the connector relies on clickhouse-http and clickhouse-client —both of which are bundled in clickhouse-jdbc:all. Alternatively, you can add clickhouse-client JAR and clickhouse-http individually if you prefer not to use the full JDBC package.
In any case, ensure that the package versions are compatible according to the Compatibility Matrix.
Register The Catalog (required)
In order to access your ClickHouse tables, you must configure a new Spark catalog with the following configs:
|Property
|Value
|Default Value
|Required
spark.sql.catalog.<catalog_name>
com.clickhouse.spark.ClickHouseCatalog
|N/A
|Yes
spark.sql.catalog.<catalog_name>.host
<clickhouse_host>
localhost
|No
spark.sql.catalog.<catalog_name>.protocol
http
http
|No
spark.sql.catalog.<catalog_name>.http_port
<clickhouse_port>
8123
|No
spark.sql.catalog.<catalog_name>.user
<clickhouse_username>
default
|No
spark.sql.catalog.<catalog_name>.password
<clickhouse_password>
|(empty string)
|No
spark.sql.catalog.<catalog_name>.database
<database>
default
|No
spark.<catalog_name>.write.format
json
arrow
|No
These settings could be set via one of the following:
- Edit/Create
spark-defaults.conf.
- Pass the configuration to your
spark-submitcommand (or to your
spark-shell/
spark-sqlCLI commands).
- Add the configuration when initiating your context.
When working with ClickHouse cluster, you need to set a unique catalog name for each instance. For example:
That way, you would be able to access clickhouse1 table
<ck_db>.<ck_table> from Spark SQL by
clickhouse1.<ck_db>.<ck_table>, and access clickhouse2 table
<ck_db>.<ck_table> by
clickhouse2.<ck_db>.<ck_table>.
Read Data
- Java
- Scala
- Python
- Spark SQL
Write Data
- Java
- Scala
- Python
- Spark SQL
DDL Operations
You can perform DDL operations on your ClickHouse instance using Spark SQL, with all changes immediately persisted in ClickHouse. Spark SQL allows you to write queries exactly as you would in ClickHouse, so you can directly execute commands such as CREATE TABLE, TRUNCATE, and more - without modification, for instance:
The above examples demonstrate Spark SQL queries, which you can run within your application using any API—Java, Scala, PySpark, or shell.
Configurations
The following are the adjustable configurations available in the connector:
|Key
|Default
|Description
|Since
|spark.clickhouse.ignoreUnsupportedTransform
|false
|ClickHouse supports using complex expressions as sharding keys or partition values, e.g.
cityHash64(col_1, col_2), and those can not be supported by Spark now. If
true, ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when
spark.clickhouse.write.distributed.convertLocal is enabled, ignore unsupported sharding keys may corrupt the data.
|0.4.0
|spark.clickhouse.read.compression.codec
|lz4
|The codec used to decompress data for reading. Supported codecs: none, lz4.
|0.5.0
|spark.clickhouse.read.distributed.convertLocal
|true
|When reading Distributed table, read local table instead of itself. If
true, ignore
spark.clickhouse.read.distributed.useClusterNodes.
|0.1.0
|spark.clickhouse.read.fixedStringAs
|binary
|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string
|0.8.0
|spark.clickhouse.read.format
|json
|Serialize format for reading. Supported formats: json, binary
|0.6.0
|spark.clickhouse.read.runtimeFilter.enabled
|false
|Enable runtime filter for reading.
|0.8.0
|spark.clickhouse.read.splitByPartitionId
|true
|If
true, construct input partition filter by virtual column
_partition_id, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+
|0.4.0
|spark.clickhouse.useNullableQuerySchema
|false
|If
true, mark all the fields of the query schema as nullable when executing
CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as
true.
|0.8.0
|spark.clickhouse.write.batchSize
|10000
|The number of records per batch on writing to ClickHouse.
|0.1.0
|spark.clickhouse.write.compression.codec
|lz4
|The codec used to compress data for writing. Supported codecs: none, lz4.
|0.3.0
|spark.clickhouse.write.distributed.convertLocal
|false
|When writing Distributed table, write local table instead of itself. If
true, ignore
spark.clickhouse.write.distributed.useClusterNodes.
|0.1.0
|spark.clickhouse.write.distributed.useClusterNodes
|true
|Write to all nodes of cluster when writing Distributed table.
|0.1.0
|spark.clickhouse.write.format
|arrow
|Serialize format for writing. Supported formats: json, arrow
|0.4.0
|spark.clickhouse.write.localSortByKey
|true
|If
true, do local sort by sort keys before writing.
|0.3.0
|spark.clickhouse.write.localSortByPartition
|value of spark.clickhouse.write.repartitionByPartition
|If
true, do local sort by partition before writing. If not set, it equals to
spark.clickhouse.write.repartitionByPartition.
|0.3.0
|spark.clickhouse.write.maxRetry
|3
|The maximum number of write we will retry for a single batch write failed with retryable codes.
|0.1.0
|spark.clickhouse.write.repartitionByPartition
|true
|Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing.
|0.3.0
|spark.clickhouse.write.repartitionNum
|0
|Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement.
|0.1.0
|spark.clickhouse.write.repartitionStrictly
|false
|If
true, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as
true.
|0.3.0
|spark.clickhouse.write.retryInterval
|10s
|The interval in seconds between write retry.
|0.1.0
|spark.clickhouse.write.retryableErrorCodes
|241
|The retryable error codes returned by ClickHouse server when write failing.
|0.1.0
Supported Data Types
This section outlines the mapping of data types between Spark and ClickHouse. The tables below provide quick references for converting data types when reading from ClickHouse into Spark and when inserting data from Spark into ClickHouse.
Reading data from ClickHouse into Spark
|ClickHouse Data Type
|Spark Data Type
|Supported
|Is Primitive
|Notes
Nothing
NullType
|✅
|Yes
Bool
BooleanType
|✅
|Yes
UInt8,
Int16
ShortType
|✅
|Yes
Int8
ByteType
|✅
|Yes
UInt16,
Int32
IntegerType
|✅
|Yes
UInt32,
Int64,
UInt64
LongType
|✅
|Yes
Int128,
UInt128,
Int256,
UInt256
DecimalType(38, 0)
|✅
|Yes
Float32
FloatType
|✅
|Yes
Float64
DoubleType
|✅
|Yes
String,
JSON,
UUID,
Enum8,
Enum16,
IPv4,
IPv6
StringType
|✅
|Yes
FixedString
BinaryType,
StringType
|✅
|Yes
|Controlled by configuration
READ_FIXED_STRING_AS
Decimal
DecimalType
|✅
|Yes
|Precision and scale up to
Decimal128
Decimal32
DecimalType(9, scale)
|✅
|Yes
Decimal64
DecimalType(18, scale)
|✅
|Yes
Decimal128
DecimalType(38, scale)
|✅
|Yes
Date,
Date32
DateType
|✅
|Yes
DateTime,
DateTime32,
DateTime64
TimestampType
|✅
|Yes
Array
ArrayType
|✅
|No
|Array element type is also converted
Map
MapType
|✅
|No
|Keys are limited to
StringType
IntervalYear
YearMonthIntervalType(Year)
|✅
|Yes
IntervalMonth
YearMonthIntervalType(Month)
|✅
|Yes
IntervalDay,
IntervalHour,
IntervalMinute,
IntervalSecond
DayTimeIntervalType
|✅
|No
|Specific interval type is used
Object
|❌
Nested
|❌
Tuple
|❌
Point
|❌
Polygon
|❌
MultiPolygon
|❌
Ring
|❌
IntervalQuarter
|❌
IntervalWeek
|❌
Decimal256
|❌
AggregateFunction
|❌
SimpleAggregateFunction
|❌
Inserting data from Spark into ClickHouse
|Spark Data Type
|ClickHouse Data Type
|Supported
|Is Primitive
|Notes
BooleanType
UInt8
|✅
|Yes
ByteType
Int8
|✅
|Yes
ShortType
Int16
|✅
|Yes
IntegerType
Int32
|✅
|Yes
LongType
Int64
|✅
|Yes
FloatType
Float32
|✅
|Yes
DoubleType
Float64
|✅
|Yes
StringType
String
|✅
|Yes
VarcharType
String
|✅
|Yes
CharType
String
|✅
|Yes
DecimalType
Decimal(p, s)
|✅
|Yes
|Precision and scale up to
Decimal128
DateType
Date
|✅
|Yes
TimestampType
DateTime
|✅
|Yes
ArrayType (list, tuple, or array)
Array
|✅
|No
|Array element type is also converted
MapType
Map
|✅
|No
|Keys are limited to
StringType
Object
|❌
Nested
|❌
Contributing and Support
If you'd like to contribute to the project or report any issues, we welcome your input! Visit our GitHub repository to open an issue, suggest improvements, or submit a pull request. Contributions are welcome! Please check the contribution guidelines in the repository before starting. Thank you for helping improve our ClickHouse Spark connector!