跳到主要内容
跳到主要内容

Spark JDBC

JDBC 是 Spark 中最常用的数据源之一。在本节中,我们将提供有关如何使用 ClickHouse 官方 JDBC 连接器 与 Spark 的详细信息。

读取数据

public static void main(String[] args) {
        // Initialize Spark session
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        String jdbcURL = "jdbc:ch://localhost:8123/default";
        String query = "select * from example_table where id > 2";

        //---------------------------------------------------------------------------------------------------
        // Load the table from ClickHouse using jdbc method
        //---------------------------------------------------------------------------------------------------
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);

        df1.show();

        //---------------------------------------------------------------------------------------------------
        // Load the table from ClickHouse using load method
        //---------------------------------------------------------------------------------------------------
        Dataset<Row> df2 = spark.read()
                .format("jdbc")
                .option("url", jdbcURL)
                .option("user", "default")
                .option("password", "123456")
                .option("query", query)
                .load();

        df2.show();

        // Stop the Spark session
        spark.stop();
    }

写入数据

public static void main(String[] args) {
       // Initialize Spark session
       SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

       // JDBC connection details
       String jdbcUrl = "jdbc:ch://localhost:8123/default";
       Properties jdbcProperties = new Properties();
       jdbcProperties.put("user", "default");
       jdbcProperties.put("password", "123456");

       // Create a sample DataFrame
       StructType schema = new StructType(new StructField[]{
               DataTypes.createStructField("id", DataTypes.IntegerType, false),
               DataTypes.createStructField("name", DataTypes.StringType, false)
       });

       List<Row> rows = new ArrayList<Row>();
       rows.add(RowFactory.create(1, "John"));
       rows.add(RowFactory.create(2, "Doe"));

       Dataset<Row> df = spark.createDataFrame(rows, schema);

       //---------------------------------------------------------------------------------------------------
       // Write the df to ClickHouse using the jdbc method
       //---------------------------------------------------------------------------------------------------

       df.write()
               .mode(SaveMode.Append)
               .jdbc(jdbcUrl, "example_table", jdbcProperties);

       //---------------------------------------------------------------------------------------------------
       // Write the df to ClickHouse using the save method
       //---------------------------------------------------------------------------------------------------

       df.write()
               .format("jdbc")
               .mode("append")
               .option("url", jdbcUrl)
               .option("dbtable", "example_table")
               .option("user", "default")
               .option("password", "123456")
               .save();

       // Stop the Spark session
       spark.stop();
   }

并行性

使用 Spark JDBC 时,Spark 使用单个分区读取数据。为了实现更高的并发性,您必须指定 partitionColumnlowerBoundupperBoundnumPartitions,这些参数描述了在从多个工作节点并行读取时如何对表进行分区。有关更多信息,请访问 Apache Spark 的官方文档,了解 JDBC 配置

JDBC 限制

  • 到目前为止,您只能通过 JDBC 将数据插入现有表中(目前没有办法在数据帧插入时自动创建表,正如 Spark 与其他连接器所做的那样)。