- データ取り込み
- Apache Spark
- Spark JDBC
Spark JDBC
JDBCは、Sparkで最も一般的に使用されるデータソースの1つです。 このセクションでは、Sparkと共に使用するためのClickHouse公式JDBCコネクタの詳細を提供します。
データの読み取り
- Java
- Scala
- Python
- Spark SQL
public static void main(String[] args) {
// Sparkセッションの初期化
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
String jdbcURL = "jdbc:ch://localhost:8123/default";
String query = "select * from example_table where id > 2";
//---------------------------------------------------------------------------------------------------
// jdbcメソッドを使用してClickHouseからテーブルをロード
//---------------------------------------------------------------------------------------------------
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);
df1.show();
//---------------------------------------------------------------------------------------------------
// loadメソッドを使用してClickHouseからテーブルをロード
//---------------------------------------------------------------------------------------------------
Dataset<Row> df2 = spark.read()
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load();
df2.show();
// Sparkセッションを停止
spark.stop();
}
object ReadData extends App {
// Sparkセッションの初期化
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
val jdbcURL = "jdbc:ch://localhost:8123/default"
val query: String = "select * from example_table where id > 2"
//---------------------------------------------------------------------------------------------------
// jdbcメソッドを使用してClickHouseからテーブルをロード
//---------------------------------------------------------------------------------------------------
val connectionProperties = new Properties()
connectionProperties.put("user", "default")
connectionProperties.put("password", "123456")
val df1: Dataset[Row] = spark.read.
jdbc(jdbcURL, s"($query)", connectionProperties)
df1.show()
//---------------------------------------------------------------------------------------------------
// loadメソッドを使用してClickHouseからテーブルをロード
//---------------------------------------------------------------------------------------------------
val df2: Dataset[Row] = spark.read
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load()
df2.show()
// Sparkセッションを停止
spark.stop()
}
from pyspark.sql import SparkSession
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# JARファイルを使用してSparkセッションを初期化
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
query = "select * from example_table where id > 2"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
df = (spark.read
.format('jdbc')
.option('driver', driver)
.option('url', url)
.option('user', user)
.option('password', password).option(
'query', query).load())
df.show()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
SELECT * FROM jdbcTable;
データの書き込み
- Java
- Scala
- Python
- Spark SQL
public static void main(String[] args) {
// Sparkセッションの初期化
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// JDBC接続の詳細
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
// サンプルDataFrameの作成
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));
Dataset<Row> df = spark.createDataFrame(rows, schema);
//---------------------------------------------------------------------------------------------------
// jdbcメソッドを使用してdfをClickHouseに書き込む
//---------------------------------------------------------------------------------------------------
df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties);
//---------------------------------------------------------------------------------------------------
// saveメソッドを使用してdfをClickHouseに書き込む
//---------------------------------------------------------------------------------------------------
df.write()
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save();
// Sparkセッションを停止
spark.stop();
}
object WriteData extends App {
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
// JDBC接続の詳細
val jdbcUrl: String = "jdbc:ch://localhost:8123/default"
val jdbcProperties: Properties = new Properties
jdbcProperties.put("user", "default")
jdbcProperties.put("password", "123456")
// サンプルDataFrameの作成
val rows = Seq(Row(1, "John"), Row(2, "Doe"))
val schema = List(
StructField("id", DataTypes.IntegerType, nullable = false),
StructField("name", StringType, nullable = true)
)
val df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
// jdbcメソッドを使用してdfをClickHouseに書き込む
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties)
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
// saveメソッドを使用してdfをClickHouseに書き込む
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
df.write
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save()
// Sparkセッションを停止// Sparkセッションを停止
spark.stop()
}
from pyspark.sql import SparkSession
from pyspark.sql import Row
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# JARファイルを使用してSparkセッションを初期化
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
# DataFrameの作成
data = [Row(id=11, name="John"), Row(id=12, name="Doe")]
df = spark.createDataFrame(data)
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
# DataFrameをClickHouseに書き込む
df.write \
.format("jdbc") \
.option("driver", driver) \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", "example_table") \
.mode("append") \
.save()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
-- resultTableはdf.createTempViewまたはSpark SQLで作成できます
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable;
並列性
Spark JDBCを使用する場合、Sparkは単一のパーティションを使用してデータを読み取ります。より高い同時実行性を達成するためには、partitionColumn
、lowerBound
、upperBound
、およびnumPartitions
を指定する必要があり、これは複数のワーカーから並列して読み取る際のテーブルのパーティショニング方法を説明します。
詳細については、Apache Sparkの公式ドキュメントにある JDBCの構成をご覧ください。
JDBCの制限
- 現在のところ、JDBCを使用して既存のテーブルにのみデータを挿入することができます(DF挿入時にテーブルを自動作成する方法はなく、Sparkが他のコネクタで行うように)。