Sparkコネクタ
このコネクタは、クエリのパフォーマンスとデータ処理を改善するために、高度なパーティショニングや述語プッシュダウンなどのClickHouse特有の最適化を利用しています。コネクタはClickHouseの公式JDBCコネクタに基づいており、自身のカタログを管理します。
Spark 3.0より前は、Sparkにはビルトインのカタログ概念が欠けていたため、ユーザーは通常、Hive MetastoreやAWS Glueなどの外部カタログシステムに依存していました。これらの外部ソリューションでは、ユーザーはSparkでアクセスする前にデータソーステーブルを手動で登録する必要がありました。しかし、Spark 3.0でカタログの概念が導入されてから、Sparkはカタログプラグインを登録することでテーブルを自動的に検出できるようになりました。
Sparkのデフォルトカタログはspark_catalogで、テーブルは{catalog name}.{database}.{table}で特定されます。この新しいカタログ機能により、単一のSparkアプリケーション内で複数のカタログを追加して操作することが可能になりました。
要件
- Java 8 または 17
- Scala 2.12 または 2.13
- Apache Spark 3.3 または 3.4 または 3.5
互換性マトリックス
| バージョン | 互換性のあるSparkバージョン | ClickHouse JDBCバージョン |
|---|---|---|
| main | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 |
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
| 0.4.0 | Spark 3.2, 3.3 | 依存しない |
| 0.3.0 | Spark 3.2, 3.3 | 依存しない |
| 0.2.1 | Spark 3.2 | 依存しない |
| 0.1.2 | Spark 3.2 | 依存しない |
インストールとセットアップ
ClickHouseとSparkを統合するためには、異なるプロジェクトセットアップに応じた複数のインストールオプションがあります。ClickHouse Sparkコネクタをプロジェクトのビルドファイル(例えば、Mavenの場合はpom.xmlやSBTの場合はbuild.sbt)に直接依存関係として追加することができます。あるいは、必要なJARファイルを$SPARK_HOME/jars/フォルダに置くか、spark-submitコマンドで--jarsフラグを使用して直接渡すことができます。どちらのアプローチでも、ClickHouseコネクタがSpark環境で利用可能になります。
依存関係としてインポート
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
SNAPSHOTバージョンを使用したい場合は、以下のリポジトリを追加してください。
SNAPSHOTバージョンを使用したい場合は、以下のリポジトリを追加してください:
Sparkのシェルオプション(Spark SQL CLI、Spark Shell CLI、及びSpark Submitコマンド)で作業する場合、必要なJARを渡すことで依存関係を登録できます:
JARファイルをSparkクライアントノードにコピーしないようにしたい場合は、代わりに以下を使用できます:
注意: SQL専用の使用ケースについては、Apache Kyuubiが本番環境での使用を推奨されています。
ライブラリのダウンロード
バイナリJARの名前パターンは次の通りです:
利用可能なリリース済みJARファイルは、Maven Central Repositoryで見つけることができ、すべてのデイリービルドのSNAPSHOT JARファイルは、Sonatype OSS Snapshots Repositoryで見つけることができます。
clickhouse-jdbc JARを「all」クラスファイア付きで含めることが重要です。このコネクタは、clickhouse-httpおよびclickhouse-client に依存しており、これらはすべてclickhouse-jdbc:allにバンドルされています。フルJDBCパッケージを使用したくない場合は、clickhouse-client JARおよびclickhouse-httpを個別に追加することもできます。
いずれにせよ、パッケージのバージョンが互換性マトリックスに従って互換性があることを確認してください。
カタログの登録(必要)
ClickHouseテーブルにアクセスするには、次の設定で新しいSparkカタログを構成する必要があります。
| プロパティ | 値 | デフォルト値 | 必須 |
|---|---|---|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | はい |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | いいえ |
spark.sql.catalog.<catalog_name>.protocol | http | http | いいえ |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | いいえ |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | いいえ |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (空の文字列) | いいえ |
spark.sql.catalog.<catalog_name>.database | <database> | default | いいえ |
spark.<catalog_name>.write.format | json | arrow | いいえ |
これらの設定は、以下のいずれかを通じて設定できます:
spark-defaults.confを編集/作成する。spark-submitコマンド(またはspark-shell/spark-sqlCLIコマンド)に設定を渡す。- コンテキストを開始する際に設定を追加する。
ClickHouseクラスターで作業する場合は、それぞれのインスタンスに対してユニークなカタログ名を設定する必要があります。例えば:
そのことで、Spark SQLを使ってclickhouse1テーブル<ck_db>.<ck_table>にはclickhouse1.<ck_db>.<ck_table>でアクセスでき、clickhouse2テーブル<ck_db>.<ck_table>にはclickhouse2.<ck_db>.<ck_table>でアクセスできるようになります。
ClickHouse Cloud設定
ClickHouse Cloudに接続する場合は、SSLを有効にし、適切なSSLモードを設定してください。例えば:
データの読み取り
- Java
- Scala
- Python
- Spark SQL
データの書き込み
- Java
- Scala
- Python
- Spark SQL
DDL操作
Spark SQLを使用してClickHouseインスタンスでDDL操作を実行でき、すべての変更はClickHouseに即座に永続化されます。Spark SQLは、ClickHouseで行うのと同じようにクエリを書くことを可能にし、CREATE TABLE、TRUNCATEなどのコマンドを修正なしで直接実行できます。例えば:
Spark SQLを使用する場合、一度に実行できるステートメントは1つだけです。
上記の例は、アプリケーション内で任意のAPI—Java、Scala、PySpark、またはシェルを使用して実行できるSpark SQLクエリを示しています。
設定
コネクタ内で調整可能な設定は以下の通りです:
| キー | デフォルト | 説明 | 以降 |
|---|---|---|---|
| spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouseは、cityHash64(col_1, col_2)のような複雑な式をシャーディングキーやパーティション値として使用することをサポートしていますが、これは現在Sparkでサポートされていません。trueの場合、サポートされていない式を無視し、そうでない場合は例外で失敗します。spark.clickhouse.write.distributed.convertLocalが有効な場合、サポートされていないシャーディングキーを無視することはデータを破損させる可能性があります。 | 0.4.0 |
| spark.clickhouse.read.compression.codec | lz4 | 読み取り用のデータを解凍するために使用されるコーデック。サポートされているコーデック:none、lz4。 | 0.5.0 |
| spark.clickhouse.read.distributed.convertLocal | true | 分散テーブルを読み取る際に、自身ではなくローカルテーブルを読み取り結果に用います。trueの場合はspark.clickhouse.read.distributed.useClusterNodesを無視します。 | 0.1.0 |
| spark.clickhouse.read.fixedStringAs | binary | ClickHouse FixedString型を指定されたSparkデータ型として読み取ります。サポートされているタイプ:binary、string | 0.8.0 |
| spark.clickhouse.read.format | json | 読み取り用のシリアライズ形式。サポートされている形式:json、binary | 0.6.0 |
| spark.clickhouse.read.runtimeFilter.enabled | false | 読み取り用のランタイムフィルタを有効化します。 | 0.8.0 |
| spark.clickhouse.read.splitByPartitionId | true | trueの場合、仮想カラム_partition_idによって入力パーティションフィルタを構築します。パーティションの値でSQL述語を組み立てるにあたって既知の問題があります。この機能はClickHouse Server v21.6以上を必要とします。 | 0.4.0 |
| spark.clickhouse.useNullableQuerySchema | false | trueの場合、テーブルを作成する際にCREATE/REPLACE TABLE ... AS SELECT ...を実行する際にクエリスキーマのすべてのフィールドをnullableとしてマークします。この設定はSPARK-43390(Spark 3.5で利用可能)が必要であり、このパッチなしでは常にtrueとして動作します。 | 0.8.0 |
| spark.clickhouse.write.batchSize | 10000 | ClickHouseに書き込みの際のバッチごとのレコード数。 | 0.1.0 |
| spark.clickhouse.write.compression.codec | lz4 | 書き込み用のデータを圧縮するために使用されるコーデック。サポートされているコーデック:none、lz4。 | 0.3.0 |
| spark.clickhouse.write.distributed.convertLocal | false | 分散テーブルを書き込む際に、自身ではなくローカルテーブルを使用します。trueの場合、spark.clickhouse.write.distributed.useClusterNodesを無視します。 | 0.1.0 |
| spark.clickhouse.write.distributed.useClusterNodes | true | 分散テーブルを書き込む際はクラスタのすべてのノードに書き込みます。 | 0.1.0 |
| spark.clickhouse.write.format | arrow | 書き込み用のシリアライズ形式。サポートされている形式:json、arrow | 0.4.0 |
| spark.clickhouse.write.localSortByKey | true | trueの場合、書き込み前にソートキーによるローカルソートを行います。 | 0.3.0 |
| spark.clickhouse.write.localSortByPartition | spark.clickhouse.write.repartitionByPartitionの値 | trueの場合、書き込み前にパーティションによるローカルソートを行います。設定されていない場合、spark.clickhouse.write.repartitionByPartitionに等しくなります。 | 0.3.0 |
| spark.clickhouse.write.maxRetry | 3 | 再試行可能なコードで失敗した単一バッチ書き込みの最大再試行回数。 | 0.1.0 |
| spark.clickhouse.write.repartitionByPartition | true | 書き込み前にClickHouseのパーティションキーによってデータを再パーティション化するかどうかを判断します。 | 0.3.0 |
| spark.clickhouse.write.repartitionNum | 0 | 書き込み前にClickHouseテーブルの分布に合うようにデータを再パーティション化する必要がある場合、この設定で再パーティション番号を指定します。値が1未満の場合は必要ありません。 | 0.1.0 |
| spark.clickhouse.write.repartitionStrictly | false | trueの場合、Sparkは入ったレコードをパーティションに厳密に分配して、書き込み時にデータソーステーブルに渡します。そうでない場合、Sparkはクエリのスピードを上げるための特定の最適化を適用しますが、分配要件を破る可能性があります。この設定はSPARK-37523(Spark 3.4で利用可能)が必要であり、このパッチなしでは常にtrueとして動作します。 | 0.3.0 |
| spark.clickhouse.write.retryInterval | 10s | 書き込み再試行の間隔(秒)。 | 0.1.0 |
| spark.clickhouse.write.retryableErrorCodes | 241 | 書き込みが失敗したときにClickHouseサーバーから返される再試行可能エラーコード。 | 0.1.0 |
サポートされるデータタイプ
このセクションでは、SparkとClickHouseとの間のデータ型のマッピングを概説します。以下の表は、ClickHouseからSparkに読み取るとき、またはSparkからClickHouseにデータを挿入する際のデータ型の変換のクイックリファレンスを提供します。
ClickHouseからSparkへのデータの読み取り
| ClickHouseデータタイプ | Sparkデータタイプ | サポート | プリミティブ | ノート |
|---|---|---|---|---|
Nothing | NullType | ✅ | はい | |
Bool | BooleanType | ✅ | はい | |
UInt8, Int16 | ShortType | ✅ | はい | |
Int8 | ByteType | ✅ | はい | |
UInt16,Int32 | IntegerType | ✅ | はい | |
UInt32,Int64, UInt64 | LongType | ✅ | はい | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | はい | |
Float32 | FloatType | ✅ | はい | |
Float64 | DoubleType | ✅ | はい | |
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | はい | |
FixedString | BinaryType, StringType | ✅ | はい | 設定READ_FIXED_STRING_ASにより制御されます |
Decimal | DecimalType | ✅ | はい | 精度とスケールはDecimal128まで |
Decimal32 | DecimalType(9, scale) | ✅ | はい | |
Decimal64 | DecimalType(18, scale) | ✅ | はい | |
Decimal128 | DecimalType(38, scale) | ✅ | はい | |
Date, Date32 | DateType | ✅ | はい | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | はい | |
Array | ArrayType | ✅ | いいえ | 配列要素の型も変換されます |
Map | MapType | ✅ | いいえ | キーはStringTypeに制限されます |
IntervalYear | YearMonthIntervalType(Year) | ✅ | はい | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | はい | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | いいえ | 特定の間隔型が使用されます |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
SparkからClickHouseへのデータの挿入
| Sparkデータタイプ | ClickHouseデータタイプ | サポート | プリミティブ | ノート |
|---|---|---|---|---|
BooleanType | UInt8 | ✅ | はい | |
ByteType | Int8 | ✅ | はい | |
ShortType | Int16 | ✅ | はい | |
IntegerType | Int32 | ✅ | はい | |
LongType | Int64 | ✅ | はい | |
FloatType | Float32 | ✅ | はい | |
DoubleType | Float64 | ✅ | はい | |
StringType | String | ✅ | はい | |
VarcharType | String | ✅ | はい | |
CharType | String | ✅ | はい | |
DecimalType | Decimal(p, s) | ✅ | はい | 精度とスケールはDecimal128まで |
DateType | Date | ✅ | はい | |
TimestampType | DateTime | ✅ | はい | |
ArrayType (リスト、タプル、または配列) | Array | ✅ | いいえ | 配列要素の型も変換されます |
MapType | Map | ✅ | いいえ | キーはStringTypeに制限されます |
Object | ❌ | |||
Nested | ❌ |
Contributing and support
プロジェクトに貢献したり、問題を報告したりしたい場合は、あなたの意見を歓迎します! 問題を報告したり、改善を提案したり、プルリクエストを送信するには、私たちの GitHub リポジトリ を訪れてください。 貢献をお待ちしています!始める前に、リポジトリ内の貢献ガイドラインを確認してください。 私たちの ClickHouse Spark コネクタの改善にご協力いただきありがとうございます!