メインコンテンツへスキップ
メインコンテンツへスキップ

Amazon Glue を ClickHouse および Spark と連携する

ClickHouse Supported

Amazon Glue は、Amazon Web Services (AWS) が提供する完全マネージド型のサーバーレス データ統合サービスです。分析、機械学習、アプリケーション開発に向けたデータの検出、準備、変換を簡素化できます。

インストール

Glue コードを ClickHouse と統合するには、Glue で公式の Spark コネクタを次のいずれかの方法で利用できます。

  • AWS Marketplace から ClickHouse Glue コネクタをインストールする (推奨) 。
  • Spark コネクタの jar を Glue ジョブに手動で追加する。
  1. コネクタをサブスクライブする

    アカウントからコネクタにアクセスするには、AWS Marketplace で ClickHouse AWS Glue Connector をサブスクライブしてください。

  2. 必要な権限を付与する

    Glue ジョブの IAM role に必要な権限があることを確認してください。詳細は、最小権限のガイドを参照してください。

  3. コネクタを有効化して接続を作成する

    このリンクをクリックすると、主要なフィールドが事前入力された Glue の接続作成ページが開き、そこから直接コネクタを有効化して接続を作成できます。接続に名前を付けて、作成をクリックしてください (この段階では ClickHouse の接続情報を入力する必要はありません) 。

  4. Glue ジョブで使用する

    Glue ジョブで Job details タブを選択し、Advanced properties ウィンドウを展開します。Connections セクションで、先ほど作成した接続を選択してください。コネクタは、必要な JAR をジョブのランタイムに自動的に追加します。

Glue Notebook の接続設定
注記

Glue コネクタで使用される JAR は、Spark 3.3Scala 2Python 3 向けにビルドされています。Glue ジョブを設定する際は、これらのバージョンを選択してください。

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // for ClickHouse cloud
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // Write to ClickHouse
    df.writeTo("clickhouse.default.cell_towers").append()


    // Read from ClickHouse
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

詳細については、Spark のドキュメントをご覧ください。