import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object ClickHouseGlueExample {
def main(sysArgs: Array[String]) {
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
val sparkSession: SparkSession = SparkSession.builder
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
.config("spark.sql.catalog.clickhouse.protocol", "https")
.config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "<your-password>")
.config("spark.sql.catalog.clickhouse.database", "default")
// for ClickHouse cloud
.config("spark.sql.catalog.clickhouse.option.ssl", "true")
.config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
.getOrCreate
val glueContext = new GlueContext(sparkSession.sparkContext)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
import sparkSession.implicits._
val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"
val schema = StructType(Seq(
StructField("radio", StringType, nullable = false),
StructField("mcc", IntegerType, nullable = false),
StructField("net", IntegerType, nullable = false),
StructField("area", IntegerType, nullable = false),
StructField("cell", LongType, nullable = false),
StructField("unit", IntegerType, nullable = false),
StructField("lon", DoubleType, nullable = false),
StructField("lat", DoubleType, nullable = false),
StructField("range", IntegerType, nullable = false),
StructField("samples", IntegerType, nullable = false),
StructField("changeable", IntegerType, nullable = false),
StructField("created", TimestampType, nullable = false),
StructField("updated", TimestampType, nullable = false),
StructField("averageSignal", IntegerType, nullable = false)
))
val df = sparkSession.read
.option("header", "true")
.schema(schema)
.csv(url)
// Write to ClickHouse
df.writeTo("clickhouse.default.cell_towers").append()
// Read from ClickHouse
val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
Job.commit()
}
}