メインコンテンツへスキップ
メインコンテンツへスキップ

Spark JDBC

ClickHouse Supported

JDBC は、Spark で最も広く使われているデータソースの 1 つです。 このセクションでは、Spark で ClickHouse official JDBC コネクタ を使用する方法について詳しく説明します。

データの読み込み

public static void main(String[] args) {
        // Spark セッションを初期化
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        String jdbcURL = "jdbc:ch://localhost:8123/default";
        String query = "select * from example_table where id > 2";

        //---------------------------------------------------------------------------------------------------
        // jdbc メソッドを使用して ClickHouse からテーブルを読み込む
        //---------------------------------------------------------------------------------------------------
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);

        df1.show();

        //---------------------------------------------------------------------------------------------------
        // load メソッドを使用して ClickHouse からテーブルを読み込む
        //---------------------------------------------------------------------------------------------------
        Dataset<Row> df2 = spark.read()
                .format("jdbc")
                .option("url", jdbcURL)
                .option("user", "default")
                .option("password", "123456")
                .option("query", query)
                .load();

        df2.show();

        // Spark セッションを停止
        spark.stop();
    }

データの書き込み

 public static void main(String[] args) {
        // Initialize Spark session
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        // JDBC connection details
        String jdbcUrl = "jdbc:ch://localhost:8123/default";
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        // Create a sample DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false)
        });

        List<Row> rows = new ArrayList<Row>();
        rows.add(RowFactory.create(1, "John"));
        rows.add(RowFactory.create(2, "Doe"));

        Dataset<Row> df = spark.createDataFrame(rows, schema);

        //---------------------------------------------------------------------------------------------------
        // Write the df to ClickHouse using the jdbc method
        //---------------------------------------------------------------------------------------------------

        df.write()
                .mode(SaveMode.Append)
                .jdbc(jdbcUrl, "example_table", jdbcProperties);

        //---------------------------------------------------------------------------------------------------
        // Write the df to ClickHouse using the save method
        //---------------------------------------------------------------------------------------------------

        df.write()
                .format("jdbc")
                .mode("append")
                .option("url", jdbcUrl)
                .option("dbtable", "example_table")
                .option("user", "default")
                .option("password", "123456")
                .save();

        // Stop the Spark session
        spark.stop();
    }

並列性

Spark JDBC を使用する場合、Spark は単一のパーティションでデータを読み取ります。より高い並列性を実現するには、 partitionColumnlowerBoundupperBoundnumPartitions を指定する必要があります。これらの設定は、複数のワーカーで 並列に読み取る際に、テーブルをどのようにパーティション分割するかを定義します。 詳しくは、Apache Spark の公式ドキュメントの JDBC 構成 を参照してください。

JDBC の制限事項

  • Spark JDBC は、ClickHouse dialect がないため、複合型 (MAP、ARRAY、STRUCT) をサポートしていません。複合型を完全にサポートするには、ネイティブの Spark-ClickHouse コネクタを使用してください。
  • 現時点では、JDBC でデータを挿入できるのは既存のテーブルのみです (現在のところ、Spark が他のコネクタで行うように、DF 挿入時に テーブルを自動作成することはできません) 。