メインコンテンツまでスキップ
メインコンテンツまでスキップ

Spark コネクタ

このコネクタは、ClickHouse特有の最適化(高度なパーティショニングや述語プッシュダウンなど)を活用して、 クエリのパフォーマンスとデータ処理を改善します。 コネクタは、ClickHouseの公式JDBCコネクタに基づいており、 独自のカタログを管理します。

Spark 3.0以前は、Sparkには組み込みのカタログ概念がなかったため、ユーザーは通常、HiveメタストアやAWS Glueなどの外部カタログシステムに依存していました。 これらの外部ソリューションでは、ユーザーはSparkでそれらにアクセスする前にデータソーステーブルを手動で登録する必要がありました。 しかし、Spark 3.0でカタログの概念が導入されたため、Sparkはカタログプラグインを登録することでテーブルを自動的に検出できるようになりました。

Sparkのデフォルトカタログはspark_catalogであり、テーブルは{catalog name}.{database}.{table}で識別されます。新しいカタログ機能により、単一のSparkアプリケーション内で複数のカタログを追加して操作することが可能になりました。

要件

  • Java 8または17
  • Scala 2.12または2.13
  • Apache Spark 3.3または3.4または3.5

互換性マトリックス

Version互換性のあるSparkバージョンClickHouse JDBCバージョン
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3依存しない
0.3.0Spark 3.2, 3.3依存しない
0.2.1Spark 3.2依存しない
0.1.2Spark 3.2依存しない

インストールとセットアップ

ClickHouseをSparkと統合するために、異なるプロジェクトセットアップに適した複数のインストールオプションがあります。 ClickHouse Sparkコネクタをプロジェクトのビルドファイル(Mavenのpom.xmlやSBTのbuild.sbtなど)に直接依存関係として追加できます。 または、必要なJARファイルを$SPARK_HOME/jars/フォルダーに置くか、spark-submitコマンドの--jarsフラグを使用してSparkオプションとして直接渡すこともできます。 どちらのアプローチでも、Spark環境でClickHouseコネクタが利用できることが保証されます。

依存関係としてインポート

SNAPSHOTバージョンを使用したい場合は、次のリポジトリを追加してください。

ライブラリのダウンロード

バイナリJARの名前パターンは次のとおりです:

利用可能なすべてのリリース済みJARファイルは、Maven Central Repository にあり、すべてのデイリービルドSNAPSHOT JARファイルは、Sonatype OSS Snapshots Repositoryにあります。

参考

clickhouse-jdbc JARを「all」分類子付きで含めることが重要です。 コネクタはclickhouse-httpclickhouse-clientの両方に依存しており、いずれもclickhouse-jdbc:allにバンドルされています。 フルJDBCパッケージを使用したくない場合は、clickhouse-client JARclickhouse-httpを個別に追加することもできます。

いずれにせよ、パッケージのバージョンが互換性マトリックスに従って互換性があることを確認してください。

カタログを登録する (必須)

ClickHouseのテーブルにアクセスするためには、次の設定を使用して新しいSparkカタログを構成する必要があります:

プロパティデフォルト値必須
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/Aはい
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostいいえ
spark.sql.catalog.<catalog_name>.protocolhttphttpいいえ
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123いいえ
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultいいえ
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空の文字列)いいえ
spark.sql.catalog.<catalog_name>.database<database>defaultいいえ
spark.<catalog_name>.write.formatjsonarrowいいえ

これらの設定は、次のいずれかで設定できます:

  • spark-defaults.confの編集または作成。
  • spark-submitコマンド(またはspark-shellspark-sql CLIコマンド)に設定を渡す。
  • コンテキストを初期化する際に設定を追加する。
参考

ClickHouse クラスターで作業する場合、各インスタンスにユニークなカタログ名を設定する必要があります。 例えば:

このようにすることで、Spark SQLからclickhouse1.<ck_db>.<ck_table>でclickhouse1テーブル<ck_db>.<ck_table>にアクセスし、clickhouse2.<ck_db>.<ck_table>でclickhouse2テーブル<ck_db>.<ck_table>にアクセスできるようになります。

データの読み込み

データの書き込み

DDL操作

Spark SQLを使用してClickHouseインスタンスでDDL操作を行うことができ、すべての変更は直ちにClickHouseに保持されます。 Spark SQLでは、ClickHouseと同様に、クエリをそのまま記述することができるため、CREATE TABLEやTRUNCATEなどのコマンドを修正なしで直接実行できます。例えば:

上記の例は、アプリケーション内で実行できるSpark SQLクエリを示しており、Java、Scala、PySpark、またはシェルを使用して実行することができます。

構成

以下はコネクタで利用可能な調整可能な構成です:


キーデフォルト説明以降
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouseは、cityHash64(col_1, col_2)のような複雑な式をシャーディングキーまたはパーティション値として使用することをサポートしていますが、これは現在Sparkにサポートされていません。trueの場合、サポートされていない式を無視します。それ以外の場合、例外で早期に失敗します。spark.clickhouse.write.distributed.convertLocalが有効な場合、サポートされていないシャーディングキーを無視するとデータが壊れる可能性があります。0.4.0
spark.clickhouse.read.compression.codeclz4読み込みのためのデータをデコンプレッションする際に使用されるコーデック。サポートされているコーデック:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue分散テーブルを読み取る際、自己ではなくローカルテーブルを読み取ります。trueの場合、spark.clickhouse.read.distributed.useClusterNodesを無視します。0.1.0
spark.clickhouse.read.fixedStringAsbinaryClickHouseのFixedString型を指定されたSparkデータ型として読み取ります。サポートされている型:binary、string0.8.0
spark.clickhouse.read.formatjson読み込みのためのシリアライズフォーマット。サポートされているフォーマット:json、binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse読み込みのためのランタイムフィルタを有効にします。0.8.0
spark.clickhouse.read.splitByPartitionIdtruetrueの場合、仮想カラム_partition_idによって入力パーティションフィルタを構成します。パーティション値によってSQL述語を組み立てることには既知の問題があります。この機能にはClickHouse Server v21.6以上が必要です。0.4.0
spark.clickhouse.useNullableQuerySchemafalsetrueの場合、CREATE/REPLACE TABLE ... AS SELECT ...を実行してテーブルを作成する際にクエリスキーマのすべてのフィールドをnullableとしてマークします。この構成はSPARK-43390(Spark 3.5で利用可能)を必要とし、このパッチがない場合は常にtrueとして動作します。0.8.0
spark.clickhouse.write.batchSize10000ClickHouseに対する書き込み時のバッチあたりのレコード数。0.1.0
spark.clickhouse.write.compression.codeclz4書き込みのためのデータを圧縮する際に使用されるコーデック。サポートされているコーデック:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse分散テーブルに書き込む際、自己ではなくローカルテーブルに書き込みます。trueの場合、spark.clickhouse.write.distributed.useClusterNodesを無視します。0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue分散テーブルを書き込む際にクラスターのすべてのノードに書き込みます。0.1.0
spark.clickhouse.write.formatarrow書き込みのためのシリアライズフォーマット。サポートされているフォーマット:json、arrow0.4.0
spark.clickhouse.write.localSortByKeytruetrueの場合、書き込む前にソートキーによるローカルソートを行います。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartitionの値trueの場合、書き込む前にパーティションによるローカルソートを行います。設定されていない場合、spark.clickhouse.write.repartitionByPartitionと等しくなります。0.3.0
spark.clickhouse.write.maxRetry3再試行可能なコードで失敗した単一のバッチ書き込みに対して再試行する最大回数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue書き込む前にClickHouseパーティションキーによってデータを再パーティションするかどうか。0.3.0
spark.clickhouse.write.repartitionNum0書き込む前にClickHouseテーブルの分配に合うようにデータを再パーティションすることが必要である場合、この設定で再パーティション数を指定します。1未満の値は要件なしを意味します。0.1.0
spark.clickhouse.write.repartitionStrictlyfalsetrueの場合、Sparkは書き込み時にデータソーステーブルにレコードを渡す前に、必要な分配を満たすために受信レコードを厳密に各パーティションに分配します。そうでない場合、Sparkはクエリを高速化するために特定の最適化を適用しますが、分配要件を破る可能性があります。この構成はSPARK-37523(Spark 3.4で利用可能)を必要とし、このパッチがない場合は常にtrueとして動作します。0.3.0
spark.clickhouse.write.retryInterval10s書き込みの再試行間の秒数の間隔。0.1.0
spark.clickhouse.write.retryableErrorCodes241書き込みが失敗したときにClickHouseサーバーから返される再試行可能なエラーコード。0.1.0

サポートされているデータ型

このセクションでは、SparkとClickHouse間のデータ型のマッピングを示します。以下の表は、ClickHouseからSparkへの読み込み時と、SparkからClickHouseへのデータ挿入時のデータ型変換の迅速な参照を提供します。

ClickHouseからSparkへのデータの読み込み

ClickHouseデータ型Sparkデータ型サポートプリミティブであるメモ
NothingNullTypeはい
BoolBooleanTypeはい
UInt8, Int16ShortTypeはい
Int8ByteTypeはい
UInt16,Int32IntegerTypeはい
UInt32,Int64, UInt64LongTypeはい
Int128,UInt128, Int256, UInt256DecimalType(38, 0)はい
Float32FloatTypeはい
Float64DoubleTypeはい
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeはい
FixedStringBinaryType, StringTypeはい設定 READ_FIXED_STRING_AS によって制御される
DecimalDecimalTypeはい精度とスケールはDecimal128まで
Decimal32DecimalType(9, scale)はい
Decimal64DecimalType(18, scale)はい
Decimal128DecimalType(38, scale)はい
Date, Date32DateTypeはい
DateTime, DateTime32, DateTime64TimestampTypeはい
ArrayArrayTypeいいえ配列要素型も変換される
MapMapTypeいいえキーはStringTypeに制限されている
IntervalYearYearMonthIntervalType(Year)はい
IntervalMonthYearMonthIntervalType(Month)はい
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeいいえ特定の間隔型が使用される
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

SparkからClickHouseへのデータの挿入

Sparkデータ型ClickHouseデータ型サポートプリミティブであるメモ
BooleanTypeUInt8はい
ByteTypeInt8はい
ShortTypeInt16はい
IntegerTypeInt32はい
LongTypeInt64はい
FloatTypeFloat32はい
DoubleTypeFloat64はい
StringTypeStringはい
VarcharTypeStringはい
CharTypeStringはい
DecimalTypeDecimal(p, s)はい精度とスケールはDecimal128まで
DateTypeDateはい
TimestampTypeDateTimeはい
ArrayType (リスト、タプル、または配列)Arrayいいえ配列要素型も変換される
MapTypeMapいいえキーはStringTypeに制限されている
Object
Nested

貢献とサポート

プロジェクトに貢献したい場合や問題を報告したい場合は、あなたのご意見を歓迎します!
GitHubリポジトリを訪れて、問題を開いたり、改善を提案したり、プルリクエストを提出してください。
貢献は歓迎されます!開始する前にリポジトリの貢献ガイドラインを確認してください。
私たちのClickHouse Sparkコネクタの改善にご協力いただきありがとうございます!