メインコンテンツまでスキップ
メインコンテンツまでスキップ

Sparkコネクタ

このコネクタは、クエリのパフォーマンスとデータ処理を改善するために、高度なパーティショニングや述語プッシュダウンなどのClickHouse特有の最適化を利用しています。コネクタはClickHouseの公式JDBCコネクタに基づいており、自身のカタログを管理します。

Spark 3.0より前は、Sparkにはビルトインのカタログ概念が欠けていたため、ユーザーは通常、Hive MetastoreやAWS Glueなどの外部カタログシステムに依存していました。これらの外部ソリューションでは、ユーザーはSparkでアクセスする前にデータソーステーブルを手動で登録する必要がありました。しかし、Spark 3.0でカタログの概念が導入されてから、Sparkはカタログプラグインを登録することでテーブルを自動的に検出できるようになりました。

Sparkのデフォルトカタログはspark_catalogで、テーブルは{catalog name}.{database}.{table}で特定されます。この新しいカタログ機能により、単一のSparkアプリケーション内で複数のカタログを追加して操作することが可能になりました。

要件

  • Java 8 または 17
  • Scala 2.12 または 2.13
  • Apache Spark 3.3 または 3.4 または 3.5

互換性マトリックス

バージョン互換性のあるSparkバージョンClickHouse JDBCバージョン
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3依存しない
0.3.0Spark 3.2, 3.3依存しない
0.2.1Spark 3.2依存しない
0.1.2Spark 3.2依存しない

インストールとセットアップ

ClickHouseとSparkを統合するためには、異なるプロジェクトセットアップに応じた複数のインストールオプションがあります。ClickHouse Sparkコネクタをプロジェクトのビルドファイル(例えば、Mavenの場合はpom.xmlやSBTの場合はbuild.sbt)に直接依存関係として追加することができます。あるいは、必要なJARファイルを$SPARK_HOME/jars/フォルダに置くか、spark-submitコマンドで--jarsフラグを使用して直接渡すことができます。どちらのアプローチでも、ClickHouseコネクタがSpark環境で利用可能になります。

依存関係としてインポート

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

SNAPSHOTバージョンを使用したい場合は、以下のリポジトリを追加してください。

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

ライブラリのダウンロード

バイナリJARの名前パターンは次の通りです:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

利用可能なリリース済みJARファイルは、Maven Central Repositoryで見つけることができ、すべてのデイリービルドのSNAPSHOT JARファイルは、Sonatype OSS Snapshots Repositoryで見つけることができます。

参考

clickhouse-jdbc JARを「all」クラスファイア付きで含めることが重要です。このコネクタは、clickhouse-httpおよびclickhouse-client に依存しており、これらはすべてclickhouse-jdbc:allにバンドルされています。フルJDBCパッケージを使用したくない場合は、clickhouse-client JARおよびclickhouse-httpを個別に追加することもできます。

いずれにせよ、パッケージのバージョンが互換性マトリックスに従って互換性があることを確認してください。

カタログの登録(必要)

ClickHouseテーブルにアクセスするには、次の設定で新しいSparkカタログを構成する必要があります。

プロパティデフォルト値必須
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/Aはい
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostいいえ
spark.sql.catalog.<catalog_name>.protocolhttphttpいいえ
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123いいえ
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultいいえ
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空の文字列)いいえ
spark.sql.catalog.<catalog_name>.database<database>defaultいいえ
spark.<catalog_name>.write.formatjsonarrowいいえ

これらの設定は、以下のいずれかを通じて設定できます:

  • spark-defaults.confを編集/作成する。
  • spark-submitコマンド(またはspark-shell/spark-sql CLIコマンド)に設定を渡す。
  • コンテキストを開始する際に設定を追加する。
参考

ClickHouseクラスターで作業する場合は、それぞれのインスタンスに対してユニークなカタログ名を設定する必要があります。例えば:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

そのことで、Spark SQLを使ってclickhouse1テーブル<ck_db>.<ck_table>にはclickhouse1.<ck_db>.<ck_table>でアクセスでき、clickhouse2テーブル<ck_db>.<ck_table>にはclickhouse2.<ck_db>.<ck_table>でアクセスできるようになります。

ClickHouse Cloud設定

ClickHouse Cloudに接続する場合は、SSLを有効にし、適切なSSLモードを設定してください。例えば:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

データの読み取り

public static void main(String[] args) {
        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

データの書き込み

public static void main(String[] args) throws AnalysisException {

       // Create a Spark session
       SparkSession spark = SparkSession.builder()
               .appName("example")
               .master("local[*]")
               .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
               .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
               .config("spark.sql.catalog.clickhouse.protocol", "http")
               .config("spark.sql.catalog.clickhouse.http_port", "8123")
               .config("spark.sql.catalog.clickhouse.user", "default")
               .config("spark.sql.catalog.clickhouse.password", "123456")
               .config("spark.sql.catalog.clickhouse.database", "default")
               .config("spark.clickhouse.write.format", "json")
               .getOrCreate();

       // Define the schema for the DataFrame
       StructType schema = new StructType(new StructField[]{
               DataTypes.createStructField("id", DataTypes.IntegerType, false),
               DataTypes.createStructField("name", DataTypes.StringType, false),
       });

       List<Row> data = Arrays.asList(
               RowFactory.create(1, "Alice"),
               RowFactory.create(2, "Bob")
       );

       // Create a DataFrame
       Dataset<Row> df = spark.createDataFrame(data, schema);

       df.writeTo("clickhouse.default.example_table").append();

       spark.stop();
   }

DDL操作

Spark SQLを使用してClickHouseインスタンスでDDL操作を実行でき、すべての変更はClickHouseに即座に永続化されます。Spark SQLは、ClickHouseで行うのと同じようにクエリを書くことを可能にし、CREATE TABLE、TRUNCATEなどのコマンドを修正なしで直接実行できます。例えば:

注記

Spark SQLを使用する場合、一度に実行できるステートメントは1つだけです。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

上記の例は、アプリケーション内で任意のAPI—Java、Scala、PySpark、またはシェルを使用して実行できるSpark SQLクエリを示しています。

設定

コネクタ内で調整可能な設定は以下の通りです:


キーデフォルト説明以降
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouseは、cityHash64(col_1, col_2)のような複雑な式をシャーディングキーやパーティション値として使用することをサポートしていますが、これは現在Sparkでサポートされていません。trueの場合、サポートされていない式を無視し、そうでない場合は例外で失敗します。spark.clickhouse.write.distributed.convertLocalが有効な場合、サポートされていないシャーディングキーを無視することはデータを破損させる可能性があります。0.4.0
spark.clickhouse.read.compression.codeclz4読み取り用のデータを解凍するために使用されるコーデック。サポートされているコーデック:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue分散テーブルを読み取る際に、自身ではなくローカルテーブルを読み取り結果に用います。trueの場合はspark.clickhouse.read.distributed.useClusterNodesを無視します。0.1.0
spark.clickhouse.read.fixedStringAsbinaryClickHouse FixedString型を指定されたSparkデータ型として読み取ります。サポートされているタイプ:binary、string0.8.0
spark.clickhouse.read.formatjson読み取り用のシリアライズ形式。サポートされている形式:json、binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse読み取り用のランタイムフィルタを有効化します。0.8.0
spark.clickhouse.read.splitByPartitionIdtruetrueの場合、仮想カラム_partition_idによって入力パーティションフィルタを構築します。パーティションの値でSQL述語を組み立てるにあたって既知の問題があります。この機能はClickHouse Server v21.6以上を必要とします。0.4.0
spark.clickhouse.useNullableQuerySchemafalsetrueの場合、テーブルを作成する際にCREATE/REPLACE TABLE ... AS SELECT ...を実行する際にクエリスキーマのすべてのフィールドをnullableとしてマークします。この設定はSPARK-43390(Spark 3.5で利用可能)が必要であり、このパッチなしでは常にtrueとして動作します。0.8.0
spark.clickhouse.write.batchSize10000ClickHouseに書き込みの際のバッチごとのレコード数。0.1.0
spark.clickhouse.write.compression.codeclz4書き込み用のデータを圧縮するために使用されるコーデック。サポートされているコーデック:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse分散テーブルを書き込む際に、自身ではなくローカルテーブルを使用します。trueの場合、spark.clickhouse.write.distributed.useClusterNodesを無視します。0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue分散テーブルを書き込む際はクラスタのすべてのノードに書き込みます。0.1.0
spark.clickhouse.write.formatarrow書き込み用のシリアライズ形式。サポートされている形式:json、arrow0.4.0
spark.clickhouse.write.localSortByKeytruetrueの場合、書き込み前にソートキーによるローカルソートを行います。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartitionの値trueの場合、書き込み前にパーティションによるローカルソートを行います。設定されていない場合、spark.clickhouse.write.repartitionByPartitionに等しくなります。0.3.0
spark.clickhouse.write.maxRetry3再試行可能なコードで失敗した単一バッチ書き込みの最大再試行回数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue書き込み前にClickHouseのパーティションキーによってデータを再パーティション化するかどうかを判断します。0.3.0
spark.clickhouse.write.repartitionNum0書き込み前にClickHouseテーブルの分布に合うようにデータを再パーティション化する必要がある場合、この設定で再パーティション番号を指定します。値が1未満の場合は必要ありません。0.1.0
spark.clickhouse.write.repartitionStrictlyfalsetrueの場合、Sparkは入ったレコードをパーティションに厳密に分配して、書き込み時にデータソーステーブルに渡します。そうでない場合、Sparkはクエリのスピードを上げるための特定の最適化を適用しますが、分配要件を破る可能性があります。この設定はSPARK-37523(Spark 3.4で利用可能)が必要であり、このパッチなしでは常にtrueとして動作します。0.3.0
spark.clickhouse.write.retryInterval10s書き込み再試行の間隔(秒)。0.1.0
spark.clickhouse.write.retryableErrorCodes241書き込みが失敗したときにClickHouseサーバーから返される再試行可能エラーコード。0.1.0

サポートされるデータタイプ

このセクションでは、SparkとClickHouseとの間のデータ型のマッピングを概説します。以下の表は、ClickHouseからSparkに読み取るとき、またはSparkからClickHouseにデータを挿入する際のデータ型の変換のクイックリファレンスを提供します。

ClickHouseからSparkへのデータの読み取り

ClickHouseデータタイプSparkデータタイプサポートプリミティブノート
NothingNullTypeはい
BoolBooleanTypeはい
UInt8, Int16ShortTypeはい
Int8ByteTypeはい
UInt16,Int32IntegerTypeはい
UInt32,Int64, UInt64LongTypeはい
Int128,UInt128, Int256, UInt256DecimalType(38, 0)はい
Float32FloatTypeはい
Float64DoubleTypeはい
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeはい
FixedStringBinaryType, StringTypeはい設定READ_FIXED_STRING_ASにより制御されます
DecimalDecimalTypeはい精度とスケールはDecimal128まで
Decimal32DecimalType(9, scale)はい
Decimal64DecimalType(18, scale)はい
Decimal128DecimalType(38, scale)はい
Date, Date32DateTypeはい
DateTime, DateTime32, DateTime64TimestampTypeはい
ArrayArrayTypeいいえ配列要素の型も変換されます
MapMapTypeいいえキーはStringTypeに制限されます
IntervalYearYearMonthIntervalType(Year)はい
IntervalMonthYearMonthIntervalType(Month)はい
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeいいえ特定の間隔型が使用されます
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

SparkからClickHouseへのデータの挿入

SparkデータタイプClickHouseデータタイプサポートプリミティブノート
BooleanTypeUInt8はい
ByteTypeInt8はい
ShortTypeInt16はい
IntegerTypeInt32はい
LongTypeInt64はい
FloatTypeFloat32はい
DoubleTypeFloat64はい
StringTypeStringはい
VarcharTypeStringはい
CharTypeStringはい
DecimalTypeDecimal(p, s)はい精度とスケールはDecimal128まで
DateTypeDateはい
TimestampTypeDateTimeはい
ArrayType (リスト、タプル、または配列)Arrayいいえ配列要素の型も変換されます
MapTypeMapいいえキーはStringTypeに制限されます
Object
Nested

Contributing and support

プロジェクトに貢献したり、問題を報告したりしたい場合は、あなたの意見を歓迎します! 問題を報告したり、改善を提案したり、プルリクエストを送信するには、私たちの GitHub リポジトリ を訪れてください。 貢献をお待ちしています!始める前に、リポジトリ内の貢献ガイドラインを確認してください。 私たちの ClickHouse Spark コネクタの改善にご協力いただきありがとうございます!