跳转到主内容
跳转到主内容

将 ClickHouse 与 Databricks 集成

ClickHouse Supported

ClickHouse Spark 连接器可以与 Databricks 无缝配合使用。本文档介绍在 Databricks 上的特定平台配置、安装方式以及使用模式。

适用于 Databricks 的 API 选择

默认情况下,Databricks 使用 Unity Catalog,这会阻止 Spark catalog 注册。在这种情况下,必须使用 TableProvider API(基于格式的访问方式)。

但是,如果通过创建一个访问模式为 No isolation shared 的集群来禁用 Unity Catalog,则可以改用 Catalog API。Catalog API 提供集中式配置以及原生的 Spark SQL 集成。

Unity Catalog 状态推荐 API说明
启用(默认)TableProvider API(基于格式)Unity Catalog 会阻止 Spark catalog 注册
禁用(No isolation shared)Catalog API需要访问模式为 "No isolation shared" 的集群

在 Databricks 中安装

选项 1:通过 Databricks UI 上传 JAR

  1. 构建或下载运行时 JAR:

    clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar
    
  2. 将 JAR 上传到 Databricks 工作区:

    • 转到 Workspace → 导航到目标文件夹
    • 单击 Upload → 选择该 JAR 文件
    • JAR 将存储在工作区中
  3. 在集群上安装该库:

    • 转到 Compute → 选择集群
    • 单击 Libraries 选项卡
    • 单击 Install New
    • 选择 DBFSWorkspace → 导航到已上传的 JAR 文件
    • 单击 Install
Databricks Libraries 选项卡
从 Workspace 卷安装库
  1. 重启集群以加载该库

方案 2:通过 Databricks CLI 安装

# Upload JAR to DBFS
databricks fs cp clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar \
  dbfs:/FileStore/jars/

# Install on cluster
databricks libraries install \
  --cluster-id <your-cluster-id> \
  --jar dbfs:/FileStore/jars/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar

选项 3:Maven 坐标(推荐)

  1. 进入您的 Databricks 工作区:

    • 前往 Compute → 选择目标集群
    • 单击 Libraries 选项卡
    • 单击 Install New
    • 选择 Maven 选项卡
  2. 添加 Maven 坐标:

com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}
Databricks Maven 库配置
  1. 单击 Install,然后重启集群以加载该库

使用 TableProvider API

在启用 Unity Catalog(默认)时,必须使用 TableProvider API(基于格式的访问方式),因为 Unity Catalog 会阻止通过 Spark catalog 进行注册。如果您通过使用访问模式为 "No isolation shared" 的集群禁用了 Unity Catalog,则可以改用 Catalog API

读取数据

# 使用 TableProvider API 从 ClickHouse 读取数据
df = spark.read \
    .format("clickhouse") \
    .option("host", "your-clickhouse-cloud-host.clickhouse.cloud") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "events") \
    .option("user", "default") \
    .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
    .option("ssl", "true") \
    .load()

# 表结构会被自动推断
df.display()

写入数据

# 写入 ClickHouse——如果表不存在,将自动创建
df.write \
    .format("clickhouse") \
    .option("host", "your-clickhouse-cloud-host.clickhouse.cloud") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "events_copy") \
    .option("user", "default") \
    .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
    .option("ssl", "true") \
    .option("order_by", "id") \  # 必需:创建新表时需要指定 ORDER BY
    .option("settings.allow_nullable_key", "1") \  # 如果 ORDER BY 包含 Nullable 列,在 ClickHouse Cloud 中是必需的
    .mode("append") \
    .save()
注意

此示例假定已在 Databricks 中预先配置好 secret scope(机密作用域)。有关配置步骤,请参阅 Databricks 的 Secret 管理文档

Databricks 特有注意事项

机密管理

使用 Databricks 的 secret scopes 安全存储 ClickHouse 凭证:

# Access secrets
password = dbutils.secrets.get(scope="clickhouse", key="password")

有关配置的说明,请参阅 Databricks 的 Secret 管理文档

ClickHouse Cloud 连接

从 Databricks 连接到 ClickHouse Cloud 时:

  1. 使用 HTTPS 协议protocol: https, http_port: 8443
  2. 启用 SSLssl: true

示例

完整工作流示例

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 使用 ClickHouse 连接器初始化 Spark
spark = SparkSession.builder \
    .config("spark.jars.packages", "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.9.0") \
    .getOrCreate()

# 从 ClickHouse 读取数据
df = spark.read \
    .format("clickhouse") \
    .option("host", "your-host.clickhouse.cloud") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "source_table") \
    .option("user", "default") \
    .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
    .option("ssl", "true") \
    .load()

# 转换数据
transformed_df = df.filter(col("status") == "active")

# 将数据写入 ClickHouse
transformed_df.write \
    .format("clickhouse") \
    .option("host", "your-host.clickhouse.cloud") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "target_table") \
    .option("user", "default") \
    .option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
    .option("ssl", "true") \
    .option("order_by", "id") \
    .mode("append") \
    .save()