メインコンテンツまでスキップ
メインコンテンツまでスキップ

Spark Connector

このコネクタは、クエリのパフォーマンスとデータ処理を改善するために、高度なパーティショニングや述語プッシュダウンなど、ClickHouse固有の最適化を活用します。このコネクタは、ClickHouseの公式JDBCコネクタに基づいており、自身のカタログを管理します。

Spark 3.0以前は、Sparkにはビルトインのカタログ概念が欠けていたため、ユーザーは通常、Hive MetastoreやAWS Glueなどの外部カタログシステムに依存していました。これらの外部ソリューションでは、ユーザーはSparkでアクセスする前にデータソーステーブルを手動で登録する必要がありました。しかし、Spark 3.0でカタログ概念が導入されたことで、Sparkはカタログプラグインを登録することによって自動的にテーブルを検出できるようになりました。

Sparkのデフォルトカタログはspark_catalogであり、テーブルは{catalog name}.{database}.{table}で識別されます。新しいカタログ機能により、単一のSparkアプリケーション内で複数のカタログを追加して作業することが可能になりました。

要件

  • Java 8または17
  • Scala 2.12または2.13
  • Apache Spark 3.3または3.4または3.5

互換性マトリックス

バージョン互換性のあるSparkバージョンClickHouse JDBCバージョン
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3依存しない
0.3.0Spark 3.2, 3.3依存しない
0.2.1Spark 3.2依存しない
0.1.2Spark 3.2依存しない

インストールとセットアップ

ClickHouseをSparkと統合するためには、さまざまなプロジェクトセットアップに適した複数のインストールオプションがあります。ClickHouse Sparkコネクタをプロジェクトのビルドファイル(Mavenの場合はpom.xml、SBTの場合はbuild.sbtなど)に依存関係として直接追加できます。あるいは、必要なJARファイルを$SPARK_HOME/jars/フォルダーに置くか、spark-submitコマンドの--jarsフラグを使用して直接渡すこともできます。どちらのアプローチも、ClickHouseコネクタがSpark環境で利用可能になることを保証します。

依存関係としてインポート

SNAPSHOTバージョンを使用する場合は、以下のリポジトリを追加します。

ライブラリのダウンロード

バイナリJARの名前パターンは以下の通りです:

利用可能なすべてのリリースJARファイルは、Maven Central Repositoryで見つけることができ、すべてのデイリービルドSNAPSHOT JARファイルは、Sonatype OSS Snapshots Repositoryで見つけることができます。

参考

"all"クラシファイアを持つclickhouse-jdbc JARを含めることが必須です。コネクタはclickhouse-httpおよびclickhouse-clientに依存しており、これらは全てclickhouse-jdbc:allにバンドルされています。フルJDBCパッケージを使用したくない場合は、clickhouse-client JARclickhouse-httpを個別に追加することもできます。

いずれにしても、パッケージバージョンが、互換性マトリックスに従って互換性があることを確認してください。

カタログの登録(必須)

ClickHouseのテーブルにアクセスするためには、以下の設定を使用して新しいSparkカタログを構成する必要があります。

プロパティデフォルト値必須
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/Aはい
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostいいえ
spark.sql.catalog.<catalog_name>.protocolhttphttpいいえ
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123いいえ
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultいいえ
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空文字列)いいえ
spark.sql.catalog.<catalog_name>.database<database>defaultいいえ
spark.<catalog_name>.write.formatjsonarrowいいえ

これらの設定は次のいずれかによって設定できます:

  • spark-defaults.confを編集または作成する。
  • spark-submitコマンド(またはspark-shellspark-sql CLIコマンド)に設定を渡す。
  • コンテキストを初期化する際に設定を追加する。
参考

ClickHouseクラスタで作業する場合、各インスタンスに対して一意のカタログ名を設定する必要があります。例えば:

そのようにすることで、Spark SQLからclickhouse1テーブル<ck_db>.<ck_table>にアクセスするためにclickhouse1.<ck_db>.<ck_table>を使用でき、clickhouse2テーブル<ck_db>.<ck_table>にアクセスするためにclickhouse2.<ck_db>.<ck_table>を使用できるようになります。

ClickHouse Cloud設定

ClickHouse Cloudに接続する際は、SSLを有効にし、適切なSSLモードを設定してください。例えば:

データの読み込み

データの書き込み

DDL操作

ClickHouseインスタンス上でDDL操作を実行することができ、すべての変更がClickHouseに即座に永続化されます。Spark SQLを使用すると、ClickHouseと同じようにクエリを書くことができるため、CREATE TABLEやTRUNCATEなどのコマンドを修正なしで直接実行できます。例えば:

上記の例は、Spark SQLクエリを示しており、Java、Scala、PySpark、またはシェルのいずれかのAPIを使用してアプリケーション内で実行できます。

Configurations

以下はコネクタで調整可能な設定です。


キーデフォルト説明以来
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouseは、シャーディングキーやパーティション値として複雑な式を使用することをサポートしています。例えば、cityHash64(col_1, col_2)のように、現在Sparkではサポートされていません。trueの場合、サポートされていない式を無視します。そうでない場合、例外で早期に失敗します。注意:spark.clickhouse.write.distributed.convertLocalが有効な場合、サポートされていないシャーディングキーを無視するとデータが破損する可能性があります。0.4.0
spark.clickhouse.read.compression.codeclz4読み取り用のデータを展開するために使用されるコーデック。サポートされているコーデック:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue分散テーブルを読み取るとき、テーブル自身の代わりにローカルテーブルを読み取ります。trueの場合、spark.clickhouse.read.distributed.useClusterNodesを無視します。0.1.0
spark.clickhouse.read.fixedStringAsbinaryClickHouseのFixedString型を指定されたSparkデータ型として読み取ります。サポートされている型:binary、string0.8.0
spark.clickhouse.read.formatjson読み取り用のシリアライズ形式。サポートされている形式:json、binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse読み取り用のランタイムフィルターを有効にします。0.8.0
spark.clickhouse.read.splitByPartitionIdtruetrueの場合、仮想カラム_partition_idによって入力パーティションフィルターを構築します。パーティション値によるSQL述語の組み立てには既知の問題があります。この機能にはClickHouse Server v21.6+が必要です。0.4.0
spark.clickhouse.useNullableQuerySchemafalsetrueの場合、テーブルを作成する際にCREATE/REPLACE TABLE ... AS SELECT ...を実行する際に、クエリスキーマのすべてのフィールドをNullableとしてマークします。この設定にはSPARK-43390(Spark 3.5に利用可能)が必要で、これがないと常にtrueとして動作します。0.8.0
spark.clickhouse.write.batchSize10000ClickHouseに書き込む際のバッチごとのレコード数。0.1.0
spark.clickhouse.write.compression.codeclz4書き込み用のデータを圧縮するために使用されるコーデック。サポートされているコーデック:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse分散テーブルを書き込むとき、テーブル自身の代わりにローカルテーブルに書き込みます。trueの場合、spark.clickhouse.write.distributed.useClusterNodesを無視します。0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue分散テーブルを書き込む際、クラスタのすべてのノードに書き込みます。0.1.0
spark.clickhouse.write.formatarrow書き込み用のシリアライズ形式。サポートされている形式:json、arrow0.4.0
spark.clickhouse.write.localSortByKeytruetrueの場合、書き込む前にソートキーでローカルソートを行います。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartitionの値trueの場合、書き込む前にパーティションによるローカルソートを行います。設定されていない場合、spark.clickhouse.write.repartitionByPartitionと同じになります。0.3.0
spark.clickhouse.write.maxRetry3再試行可能なコードで失敗した単一バッチ書き込みに対して再試行する最大回数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrueClickHouseテーブルの分布を満たすために書き込む前に、ClickHouseのパーティションキーによってデータを再パーティションします。0.3.0
spark.clickhouse.write.repartitionNum0ClickHouseテーブルの分布を満たすために、書き込む前にデータを再パーティションする必要があり、この設定で再パーティションの数を指定します。値が1未満の場合、要件がないことを示します。0.1.0
spark.clickhouse.write.repartitionStrictlyfalsetrueの場合、Sparkは、データソーステーブルにレコードを渡す前に、必要な分布を満たすために受信レコードをパーティションに厳密に分散させます。そうでない場合、Sparkはクエリを高速化するために特定の最適化を適用し、分布要件を壊す可能性があります。この設定にはSPARK-37523(Spark 3.4に利用可能)が必要で、これがないと常にtrueとして動作します。0.3.0
spark.clickhouse.write.retryInterval10s書き込み再試行の間隔(秒)。0.1.0
spark.clickhouse.write.retryableErrorCodes241書き込みが失敗したときにClickHouseサーバーから返される再試行可能なエラーコード。0.1.0

Supported Data Types

このセクションでは、SparkとClickHouse間のデータ型のマッピングを示します。以下の表は、ClickHouseからSparkへ読み取る際、およびSparkからClickHouseにデータを挿入する際のデータ型変換のためのクイックリファレンスを提供します。

ClickHouseからSparkへデータを読み取る

ClickHouseデータ型Sparkデータ型サポートプリミティブノート
NothingNullTypeはい
BoolBooleanTypeはい
UInt8, Int16ShortTypeはい
Int8ByteTypeはい
UInt16,Int32IntegerTypeはい
UInt32,Int64, UInt64LongTypeはい
Int128,UInt128, Int256, UInt256DecimalType(38, 0)はい
Float32FloatTypeはい
Float64DoubleTypeはい
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeはい
FixedStringBinaryType, StringTypeはい設定READ_FIXED_STRING_ASによって制御されます
DecimalDecimalTypeはい精度とスケールはDecimal128までサポート
Decimal32DecimalType(9, scale)はい
Decimal64DecimalType(18, scale)はい
Decimal128DecimalType(38, scale)はい
Date, Date32DateTypeはい
DateTime, DateTime32, DateTime64TimestampTypeはい
ArrayArrayTypeいいえ配列要素型も変換されます
MapMapTypeいいえキーはStringTypeに制限されています
IntervalYearYearMonthIntervalType(Year)はい
IntervalMonthYearMonthIntervalType(Month)はい
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeいいえ特定の間隔タイプが使用されます
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

SparkからClickHouseへデータを挿入する

Sparkデータ型ClickHouseデータ型サポートプリミティブノート
BooleanTypeUInt8はい
ByteTypeInt8はい
ShortTypeInt16はい
IntegerTypeInt32はい
LongTypeInt64はい
FloatTypeFloat32はい
DoubleTypeFloat64はい
StringTypeStringはい
VarcharTypeStringはい
CharTypeStringはい
DecimalTypeDecimal(p, s)はい精度とスケールはDecimal128までサポート
DateTypeDateはい
TimestampTypeDateTimeはい
ArrayType (リスト、タプル、または配列)Arrayいいえ配列要素型も変換されます
MapTypeMapいいえキーはStringTypeに制限されています
Object
Nested

Contributing and Support

プロジェクトへの貢献や問題の報告をご希望の場合は、皆様のご意見をお待ちしております!
GitHubリポジトリを訪れて、問題を開いたり、改善を提案したり、プルリクエストを提出したりしてください。
貢献をお待ちしております!始める前にリポジトリの貢献ガイドラインを確認してください。
ClickHouse Sparkコネクタの改善にご協力いただきありがとうございます!