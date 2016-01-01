Spark JDBC

One of the most used data sources supported by Spark is JDBC. In this section, we will provide details on how to use the ClickHouse official JDBC connector with Spark.

SparkSQL public static void main ( String [ ] args ) {



SparkSession spark = SparkSession . builder ( ) . appName ( "example" ) . master ( "local" ) . getOrCreate ( ) ;



String jdbcURL = "jdbc:ch://localhost:8123/default" ;

String query = "select * from example_table where id > 2" ;











Properties jdbcProperties = new Properties ( ) ;

jdbcProperties . put ( "user" , "default" ) ;

jdbcProperties . put ( "password" , "123456" ) ;



Dataset < Row > df1 = spark . read ( ) . jdbc ( jdbcURL , String . format ( "(%s)" , query ) , jdbcProperties ) ;



df1 . show ( ) ;









Dataset < Row > df2 = spark . read ( )

. format ( "jdbc" )

. option ( "url" , jdbcURL )

. option ( "user" , "default" )

. option ( "password" , "123456" )

. option ( "query" , query )

. load ( ) ;





df2 . show ( ) ;







spark . stop ( ) ;

}

object ReadData extends App {



val spark : SparkSession = SparkSession . builder . appName ( "example" ) . master ( "local" ) . getOrCreate



val jdbcURL = "jdbc:ch://localhost:8123/default"

val query : String = "select * from example_table where id > 2"











val connectionProperties = new Properties ( )

connectionProperties . put ( "user" , "default" )

connectionProperties . put ( "password" , "123456" )



val df1 : Dataset [ Row ] = spark . read .

jdbc ( jdbcURL , s "($query)" , connectionProperties )



df1 . show ( )







val df2 : Dataset [ Row ] = spark . read

. format ( "jdbc" )

. option ( "url" , jdbcURL )

. option ( "user" , "default" )

. option ( "password" , "123456" )

. option ( "query" , query )

. load ( )



df2 . show ( )









spark . stop ( )



}

from pyspark . sql import SparkSession



jar_files = [

"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"

]





spark = SparkSession . builder \

. appName ( "example" ) \

. master ( "local" ) \

. config ( "spark.jars" , "," . join ( jar_files ) ) \

. getOrCreate ( )



url = "jdbc:ch://localhost:8123/default"

user = "your_user"

password = "your_password"

query = "select * from example_table where id > 2"

driver = "com.clickhouse.jdbc.ClickHouseDriver"



df = ( spark . read

. format ( 'jdbc' )

. option ( 'driver' , driver )

. option ( 'url' , url )

. option ( 'user' , user )

. option ( 'password' , password ) . option (

'query' , query ) . load ( ) )



df . show ( )



CREATE TEMPORARY VIEW jdbcTable

USING org . apache . spark . sql . jdbc

OPTIONS (

url "jdbc:ch://localhost:8123/default" ,

dbtable "schema.tablename" ,

user "username" ,

password "password" ,

driver "com.clickhouse.jdbc.ClickHouseDriver"

) ;



SELECT * FROM jdbcTable ;



SparkSQL public static void main ( String [ ] args ) {



SparkSession spark = SparkSession . builder ( ) . appName ( "example" ) . master ( "local" ) . getOrCreate ( ) ;





String jdbcUrl = "jdbc:ch://localhost:8123/default" ;

Properties jdbcProperties = new Properties ( ) ;

jdbcProperties . put ( "user" , "default" ) ;

jdbcProperties . put ( "password" , "123456" ) ;





StructType schema = new StructType ( new StructField [ ] {

DataTypes . createStructField ( "id" , DataTypes . IntegerType , false ) ,

DataTypes . createStructField ( "name" , DataTypes . StringType , false )

} ) ;



List < Row > rows = new ArrayList < Row > ( ) ;

rows . add ( RowFactory . create ( 1 , "John" ) ) ;

rows . add ( RowFactory . create ( 2 , "Doe" ) ) ;





Dataset < Row > df = spark . createDataFrame ( rows , schema ) ;











df . write ( )

. mode ( SaveMode . Append )

. jdbc ( jdbcUrl , "example_table" , jdbcProperties ) ;











df . write ( )

. format ( "jdbc" )

. mode ( "append" )

. option ( "url" , jdbcUrl )

. option ( "dbtable" , "example_table" )

. option ( "user" , "default" )

. option ( "password" , "123456" )

. option ( "SaveMode" , "append" )

. save ( ) ;







spark . stop ( ) ;

}

object WriteData extends App {



val spark : SparkSession = SparkSession . builder . appName ( "example" ) . master ( "local" ) . getOrCreate





val jdbcUrl : String = "jdbc:ch://localhost:8123/default"

val jdbcProperties : Properties = new Properties

jdbcProperties . put ( "user" , "default" )

jdbcProperties . put ( "password" , "123456" )









val rows = Seq ( Row ( 1 , "John" ) , Row ( 2 , "Doe" ) )



val schema = List (

StructField ( "id" , DataTypes . IntegerType , nullable = false ) ,

StructField ( "name" , StringType , nullable = true )

)



val df : DataFrame = spark . createDataFrame (

spark . sparkContext . parallelize ( rows ) ,

StructType ( schema )

)











df . write

. mode ( SaveMode . Append )

. jdbc ( jdbcUrl , "example_table" , jdbcProperties )











df . write

. format ( "jdbc" )

. mode ( "append" )

. option ( "url" , jdbcUrl )

. option ( "dbtable" , "example_table" )

. option ( "user" , "default" )

. option ( "password" , "123456" )

. option ( "SaveMode" , "append" )

. save ( )







spark . stop ( )



}

from pyspark . sql import SparkSession

from pyspark . sql import Row



jar_files = [

"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"

]





spark = SparkSession . builder \

. appName ( "example" ) \

. master ( "local" ) \

. config ( "spark.jars" , "," . join ( jar_files ) ) \

. getOrCreate ( )





data = [ Row ( id = 11 , name = "John" ) , Row ( id = 12 , name = "Doe" ) ]

df = spark . createDataFrame ( data )



url = "jdbc:ch://localhost:8123/default"

user = "your_user"

password = "your_password"

driver = "com.clickhouse.jdbc.ClickHouseDriver"





df . write \

. format ( "jdbc" ) \

. option ( "driver" , driver ) \

. option ( "url" , url ) \

. option ( "user" , user ) \

. option ( "password" , password ) \

. option ( "dbtable" , "example_table" ) \

. mode ( "append" ) \

. save ( )





CREATE TEMPORARY VIEW jdbcTable

USING org . apache . spark . sql . jdbc

OPTIONS (

url "jdbc:ch://localhost:8123/default" ,

dbtable "schema.tablename" ,

user "username" ,

password "password" ,

driver "com.clickhouse.jdbc.ClickHouseDriver"

) ;



INSERT INTO TABLE jdbcTable

SELECT * FROM resultTable ;





When using Spark JDBC, Spark reads the data using a single partition. To achieve higher concurrency, you must specify partitionColumn , lowerBound , upperBound , and numPartitions , which describe how to partition the table when reading in parallel from multiple workers. Please visit Apache Spark's official documentation for more information on JDBC configurations.