跳到主要内容
跳到主要内容

将 Amazon Glue 与 ClickHouse 和 Spark 集成

Amazon Glue 是 Amazon Web Services (AWS) 提供的全托管无服务器数据集成服务。它简化了数据发现、准备和转换的过程,以便进行分析、机器学习和应用开发。

安装

要将您的 Glue 代码与 ClickHouse 集成,您可以通过以下其中一种方式使用我们的官方 Spark 连接器:

  • 从 AWS Marketplace 安装 ClickHouse Glue 连接器(推荐)。
  • 手动将 Spark 连接器的 JAR 文件添加到您的 Glue 作业中。
  1. 订阅连接器

    要在您的账户中访问连接器,请从 AWS Marketplace 订阅 ClickHouse AWS Glue 连接器。

  2. 授予所需权限

    确保您的 Glue 作业的 IAM 角色具有所需的权限,如最低权限 指南 中所述。

  3. 激活连接器并创建连接

    您可以通过单击 此链接 直接激活连接器并创建连接,该链接将打开预填关键字段的 Glue 连接创建页面。为连接命名,然后按创建(此阶段无需提供 ClickHouse 连接详细信息)。

  4. 在 Glue 作业中使用

    在您的 Glue 作业中,选择 Job details 选项卡,并展开 Advanced properties 窗口。在 Connections 部分,选择您刚才创建的连接。连接器会自动将所需的 JAR 注入作业运行时。

Glue Notebook connections config
备注

Glue 连接器中使用的 JAR 是为 Spark 3.3Scala 2Python 3 构建的。配置 Glue 作业时,请确保选择这些版本。

示例

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // for ClickHouse cloud
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // Write to ClickHouse
    df.writeTo("clickhouse.default.cell_towers").append()


    // Read from ClickHouse
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

有关更多详细信息,请访问我们的 Spark 文档.