跳到主要内容
跳到主要内容

ClickHouse 中的存储过程和查询参数

如果你之前使用的是传统关系型数据库,可能会在 ClickHouse 中寻找存储过程和预处理语句(prepared statements)。 本指南将解释 ClickHouse 对这些概念的处理方式,并提供推荐的替代方案。

ClickHouse 中存储过程的替代方案

ClickHouse 不支持带有控制流逻辑(IF/ELSE、循环等)的传统存储过程。 这是基于 ClickHouse 作为分析型数据库的架构而做出的刻意设计选择。 在分析型数据库中不鼓励使用循环,因为处理 O(n) 个简单查询通常比处理少量复杂查询更慢。

ClickHouse 针对以下场景进行了优化:

  • 分析型工作负载 - 对大型数据集进行复杂聚合
  • 批处理 - 高效处理大规模数据量
  • 声明式查询 - 使用 SQL 查询描述要检索哪些数据,而不是如何处理这些数据

带有过程式逻辑的存储过程与这些优化方向相悖。为此,ClickHouse 提供了与其优势相匹配的替代方案。

用户定义函数(UDF)

用户定义函数无需使用控制流即可封装可复用的逻辑。ClickHouse 支持两种类型:

基于 Lambda 的 UDF

使用 SQL 表达式和 lambda 语法创建函数:

用于示例的样本数据
-- 创建 products 表
CREATE TABLE products (
    product_id UInt32,
    product_name String,
    price Decimal(10, 2)
)
ENGINE = MergeTree()
ORDER BY product_id;

-- 插入示例数据
INSERT INTO products (product_id, product_name, price) VALUES
(1, 'Laptop', 899.99),
(2, 'Wireless Mouse', 24.99),
(3, 'USB-C Cable', 12.50),
(4, 'Monitor', 299.00),
(5, 'Keyboard', 79.99),
(6, 'Webcam', 54.95),
(7, 'Desk Lamp', 34.99),
(8, 'External Hard Drive', 119.99),
(9, 'Headphones', 149.00),
(10, 'Phone Stand', 15.99);
-- 简单的计算函数
CREATE FUNCTION calculate_tax AS (price, rate) -> price * rate;

SELECT
    product_name,
    price,
    calculate_tax(price, 0.08) AS tax
FROM products;
-- 使用 if() 实现条件逻辑
CREATE FUNCTION price_tier AS (price) ->
    if(price < 100, '经济型',
       if(price < 500, '中端', '高端'));

SELECT
    product_name,
    price,
    price_tier(price) AS tier
FROM products;
-- 字符串处理
CREATE FUNCTION format_phone AS (phone) ->
    concat('(', substring(phone, 1, 3), ') ',
           substring(phone, 4, 3), '-',
           substring(phone, 7, 4));

SELECT format_phone('5551234567');
-- 结果:(555) 123-4567

限制:

  • 不支持循环或复杂的控制流
  • 不能修改数据(INSERT/UPDATE/DELETE
  • 不允许使用递归函数

完整语法请参阅 CREATE FUNCTION

可执行 UDF

对于更复杂的逻辑,可以使用可执行 UDF 函数来调用外部程序:

<!-- /etc/clickhouse-server/sentiment_analysis_function.xml -->
<functions>
    <function>
        <type>executable</type>
        <name>sentiment_score</name>
        <return_type>Float32</return_type>
        <argument>
            <type>String</type>
        </argument>
        <format>TabSeparated</format>
        <command>python3 /opt/scripts/sentiment.py</command>
    </function>
</functions>
-- 使用可执行 UDF
SELECT
    review_text,
    sentiment_score(review_text) AS score
FROM customer_reviews;

可执行 UDF 可以使用任意语言(Python、Node.js、Go 等)来实现任意逻辑。

有关详细信息,请参阅可执行 UDF

参数化视图

参数化视图类似于返回数据集的函数。 它们非常适合带有动态过滤条件的可复用查询:

示例数据
-- 创建 sales 表
CREATE TABLE sales (
  date Date,
  product_id UInt32,
  product_name String,
  category String,
  quantity UInt32,
  revenue Decimal(10, 2),
  sales_amount Decimal(10, 2)
)
ENGINE = MergeTree()
ORDER BY (date, product_id);

-- 插入示例数据
INSERT INTO sales VALUES
('2024-01-05', 12345, 'Laptop Pro', 'Electronics', 2, 1799.98, 1799.98),
('2024-01-06', 12345, 'Laptop Pro', 'Electronics', 1, 899.99, 899.99),
('2024-01-10', 12346, 'Wireless Mouse', 'Electronics', 5, 124.95, 124.95),
('2024-01-15', 12347, 'USB-C Cable', 'Accessories', 10, 125.00, 125.00),
('2024-01-20', 12345, 'Laptop Pro', 'Electronics', 3, 2699.97, 2699.97),
('2024-01-25', 12348, 'Monitor 4K', 'Electronics', 2, 598.00, 598.00),
('2024-02-01', 12345, 'Laptop Pro', 'Electronics', 1, 899.99, 899.99),
('2024-02-05', 12349, 'Keyboard Mechanical', 'Accessories', 4, 319.96, 319.96),
('2024-02-10', 12346, 'Wireless Mouse', 'Electronics', 8, 199.92, 199.92),
('2024-02-15', 12350, 'Webcam HD', 'Electronics', 3, 164.85, 164.85);
-- 创建参数化视图
CREATE VIEW sales_by_date AS
SELECT
    date,
    product_id,
    sum(quantity) AS total_quantity,  -- 总数量
    sum(revenue) AS total_revenue      -- 总收入
FROM sales
WHERE date BETWEEN {start_date:Date} AND {end_date:Date}
GROUP BY date, product_id;
-- 使用参数查询视图
SELECT *
FROM sales_by_date(start_date='2024-01-01', end_date='2024-01-31')
WHERE product_id = 12345;

常见用例

-- 更复杂的参数化视图
CREATE VIEW top_products_by_category AS
SELECT
    category,
    product_name,
    revenue,
    rank
FROM (
    SELECT
        category,
        product_name,
        revenue,
        rank() OVER (PARTITION BY category ORDER BY revenue DESC) AS rank
    FROM (
        SELECT
            category,
            product_name,
            sum(sales_amount) AS revenue
        FROM sales
        WHERE category = {category:String}
            AND date >= {min_date:Date}
        GROUP BY category, product_name
    )
)
WHERE rank <= {top_n:UInt32};

-- 使用方法
SELECT * FROM top_products_by_category(
    category='Electronics',
    min_date='2024-01-01',
    top_n=10
);

有关详细信息,请参阅参数化视图部分。

物化视图

物化视图非常适合用于预先计算那些在传统方案中通常由存储过程执行的高开销聚合操作。如果你有使用传统数据库的背景,可以把物化视图理解为一种 INSERT 触发器(trigger),它会在数据插入到源表时自动对其进行转换和聚合:

-- 源表
CREATE TABLE page_views (
    user_id UInt64,
    page String,
    timestamp DateTime,
    session_id String
)
ENGINE = MergeTree()
ORDER BY (user_id, timestamp);

-- 用于维护聚合统计数据的物化视图
CREATE MATERIALIZED VIEW daily_user_stats
ENGINE = SummingMergeTree()
ORDER BY (date, user_id)
AS SELECT
    toDate(timestamp) AS date,
    user_id,
    count() AS page_views,
    uniq(session_id) AS sessions,
    uniq(page) AS unique_pages
FROM page_views
GROUP BY date, user_id;

-- 向源表插入示例数据
INSERT INTO page_views VALUES
(101, '/home', '2024-01-15 10:00:00', 'session_a1'),
(101, '/products', '2024-01-15 10:05:00', 'session_a1'),
(101, '/checkout', '2024-01-15 10:10:00', 'session_a1'),
(102, '/home', '2024-01-15 11:00:00', 'session_b1'),
(102, '/about', '2024-01-15 11:05:00', 'session_b1'),
(101, '/home', '2024-01-16 09:00:00', 'session_a2'),
(101, '/products', '2024-01-16 09:15:00', 'session_a2'),
(103, '/home', '2024-01-16 14:00:00', 'session_c1'),
(103, '/products', '2024-01-16 14:05:00', 'session_c1'),
(103, '/products', '2024-01-16 14:10:00', 'session_c1'),
(102, '/home', '2024-01-17 10:30:00', 'session_b2'),
(102, '/contact', '2024-01-17 10:35:00', 'session_b2');

-- 查询预聚合数据
SELECT
    user_id,
    sum(page_views) AS total_views,
    sum(sessions) AS total_sessions
FROM daily_user_stats
WHERE date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY user_id;

可刷新物化视图

用于计划的批处理任务(例如每晚运行的存储过程):

-- 每天凌晨 2 点自动刷新
CREATE MATERIALIZED VIEW monthly_sales_report
REFRESH EVERY 1 DAY OFFSET 2 HOUR
AS SELECT
    toStartOfMonth(order_date) AS month,
    region,
    product_category,
    count() AS order_count,
    sum(amount) AS total_revenue,
    avg(amount) AS avg_order_value
FROM orders
WHERE order_date >= today() - INTERVAL 13 MONTH
GROUP BY month, region, product_category;

-- 查询始终获取最新数据
SELECT * FROM monthly_sales_report
WHERE month = toStartOfMonth(today());

有关更高级的用法,请参阅 级联物化视图

外部编排

对于复杂的业务逻辑、ETL 工作流或多步骤流程,可以在 ClickHouse 外部使用各类语言客户端来实现这些逻辑。

使用应用代码

下面是一个对比示例,展示了如何将 MySQL 存储过程改写为 ClickHouse 的应用代码:

DELIMITER $$

CREATE PROCEDURE process_order(
    IN p_order_id INT,
    IN p_customer_id INT,
    IN p_order_total DECIMAL(10,2),
    OUT p_status VARCHAR(50),
    OUT p_loyalty_points INT
)
BEGIN
    DECLARE v_customer_tier VARCHAR(20);
    DECLARE v_previous_orders INT;
    DECLARE v_discount DECIMAL(10,2);

    -- 启动事务
    START TRANSACTION;

    -- 获取客户信息
    SELECT tier, total_orders
    INTO v_customer_tier, v_previous_orders
    FROM customers
    WHERE customer_id = p_customer_id;

    -- 根据会员等级计算折扣
    IF v_customer_tier = 'gold' THEN
        SET v_discount = p_order_total * 0.15;
    ELSEIF v_customer_tier = 'silver' THEN
        SET v_discount = p_order_total * 0.10;
    ELSE
        SET v_discount = 0;
    END IF;

    -- 插入订单记录
    INSERT INTO orders (order_id, customer_id, order_total, discount, final_amount)
    VALUES (p_order_id, p_customer_id, p_order_total, v_discount,
            p_order_total - v_discount);

    -- 更新客户统计数据
    UPDATE customers
    SET total_orders = total_orders + 1,
        lifetime_value = lifetime_value + (p_order_total - v_discount),
        last_order_date = NOW()
    WHERE customer_id = p_customer_id;

    -- 计算积分(每美元1积分)
    SET p_loyalty_points = FLOOR(p_order_total - v_discount);

    -- 插入积分交易记录
    INSERT INTO loyalty_points (customer_id, points, transaction_date, description)
    VALUES (p_customer_id, p_loyalty_points, NOW(),
            CONCAT('Order #', p_order_id));

    -- 检查客户是否应升级会员等级
    IF v_previous_orders + 1 >= 10 AND v_customer_tier = 'bronze' THEN
        UPDATE customers SET tier = 'silver' WHERE customer_id = p_customer_id;
        SET p_status = 'ORDER_COMPLETE_TIER_UPGRADED_SILVER';
    ELSEIF v_previous_orders + 1 >= 50 AND v_customer_tier = 'silver' THEN
        UPDATE customers SET tier = 'gold' WHERE customer_id = p_customer_id;
        SET p_status = 'ORDER_COMPLETE_TIER_UPGRADED_GOLD';
    ELSE
        SET p_status = 'ORDER_COMPLETE';
    END IF;

    COMMIT;
END$$

DELIMITER ;

-- 调用存储过程
CALL process_order(12345, 5678, 250.00, @status, @points);
SELECT @status, @points;

关键区别

  1. 控制流 - MySQL 存储过程使用 IF/ELSEWHILE 等循环结构。在 ClickHouse 中,应在应用程序代码(Python、Java 等)中实现这类逻辑。
  2. 事务 - MySQL 支持 BEGIN/COMMIT/ROLLBACK 以实现 ACID 事务。ClickHouse 是为追加写入型工作负载优化的分析型数据库,而不是面向事务性更新。
  3. 更新 - MySQL 使用 UPDATE 语句。对于可变数据,ClickHouse 更倾向于使用 INSERT 配合 ReplacingMergeTreeCollapsingMergeTree
  4. 变量和状态 - MySQL 存储过程可以声明变量(DECLARE v_discount)。在 ClickHouse 中,应在应用程序代码中管理状态。
  5. 错误处理 - MySQL 支持 SIGNAL 和异常处理器。在应用程序代码中,应使用所用语言的原生错误处理机制(try/catch)。
提示

何时采用各方案:

  • OLTP 工作负载(订单、支付、用户账户)→ 使用带存储过程的 MySQL/PostgreSQL
  • 分析型工作负载(报表、聚合、时序数据)→ 使用配合应用程序编排的 ClickHouse
  • 混合架构 → 两者都用!将 OLTP 中的事务数据流式传输到 ClickHouse 用于分析

使用工作流编排工具

  • Apache Airflow - 调度和监控由 ClickHouse 查询组成的复杂 DAG
  • dbt - 使用基于 SQL 的工作流进行数据转换
  • Prefect/Dagster - 现代的、基于 Python 的编排框架
  • 自定义调度器 - Cron 作业、Kubernetes CronJob 等

使用外部编排的优势:

  • 完整的编程语言特性
  • 更完善的错误处理和重试逻辑
  • 与外部系统集成(API、其他数据库)
  • 版本控制和测试
  • 监控与告警
  • 更灵活的调度

ClickHouse 中预处理语句的替代方案

虽然 ClickHouse 不支持传统关系型数据库(RDBMS)中的“预处理语句(prepared statements)”,但它提供了具有相同作用的查询参数:通过安全的参数化查询来防止 SQL 注入。

语法

定义查询参数有两种方式:

方法 1:使用 SET

示例表和数据
-- 创建 user_events 表(ClickHouse 语法)
CREATE TABLE user_events (
    event_id UInt32,
    user_id UInt64,
    event_name String,
    event_date Date,
    event_timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (user_id, event_date);

-- 插入多用户、多事件的示例数据
INSERT INTO user_events (event_id, user_id, event_name, event_date, event_timestamp) VALUES
(1, 12345, 'page_view', '2024-01-05', '2024-01-05 10:30:00'),
(2, 12345, 'page_view', '2024-01-05', '2024-01-05 10:35:00'),
(3, 12345, 'add_to_cart', '2024-01-05', '2024-01-05 10:40:00'),
(4, 12345, 'page_view', '2024-01-10', '2024-01-10 14:20:00'),
(5, 12345, 'add_to_cart', '2024-01-10', '2024-01-10 14:25:00'),
(6, 12345, 'purchase', '2024-01-10', '2024-01-10 14:30:00'),
(7, 12345, 'page_view', '2024-01-15', '2024-01-15 09:15:00'),
(8, 12345, 'page_view', '2024-01-15', '2024-01-15 09:20:00'),
(9, 12345, 'page_view', '2024-01-20', '2024-01-20 16:45:00'),
(10, 12345, 'add_to_cart', '2024-01-20', '2024-01-20 16:50:00'),
(11, 12345, 'purchase', '2024-01-25', '2024-01-25 11:10:00'),
(12, 12345, 'page_view', '2024-01-28', '2024-01-28 13:30:00'),
(13, 67890, 'page_view', '2024-01-05', '2024-01-05 11:00:00'),
(14, 67890, 'add_to_cart', '2024-01-05', '2024-01-05 11:05:00'),
(15, 67890, 'purchase', '2024-01-05', '2024-01-05 11:10:00'),
(16, 12345, 'page_view', '2024-02-01', '2024-02-01 10:00:00'),
(17, 12345, 'add_to_cart', '2024-02-01', '2024-02-01 10:05:00');
SET param_user_id = 12345;
SET param_start_date = '2024-01-01';
SET param_end_date = '2024-01-31';

SELECT
    event_name,
    count() AS event_count
FROM user_events
WHERE user_id = {user_id: UInt64}
    AND event_date BETWEEN {start_date: Date} AND {end_date: Date}
GROUP BY event_name;

方法 2:使用 CLI 参数

clickhouse-client \
    --param_user_id=12345 \
    --param_start_date='2024-01-01' \
    --param_end_date='2024-01-31' \
    --query="SELECT count() FROM user_events
             WHERE user_id = {user_id: UInt64}
             AND event_date BETWEEN {start_date: Date} AND {end_date: Date}"

参数语法

参数使用以下语法进行引用:{parameter_name: DataType}

  • parameter_name - 参数名称(不包含 param_ 前缀)
  • DataType - 参数要转换成的 ClickHouse 数据类型

数据类型示例

示例所用的表和示例数据
-- 1. 创建用于字符串和数字测试的表
CREATE TABLE IF NOT EXISTS users (
    name String,
    age UInt8,
    salary Float64
) ENGINE = Memory;

INSERT INTO users VALUES
    ('John Doe', 25, 75000.50),
    ('Jane Smith', 30, 85000.75),
    ('Peter Jones', 20, 50000.00);

-- 2. 创建用于日期和时间戳测试的表
CREATE TABLE IF NOT EXISTS events (
    event_date Date,
    event_timestamp DateTime
) ENGINE = Memory;

INSERT INTO events VALUES
    ('2024-01-15', '2024-01-15 14:30:00'),
    ('2024-01-15', '2024-01-15 15:00:00'),
    ('2024-01-16', '2024-01-16 10:00:00');

-- 3. 创建用于数组测试的表
CREATE TABLE IF NOT EXISTS products (
    id UInt32,
    name String
) ENGINE = Memory;

INSERT INTO products VALUES (1, 'Laptop'), (2, 'Monitor'), (3, 'Mouse'), (4, 'Keyboard');

-- 4. 创建用于 Map(类似 struct)测试的表
CREATE TABLE IF NOT EXISTS accounts (
    user_id UInt32,
    status String,
    type String
) ENGINE = Memory;

INSERT INTO accounts VALUES
    (101, 'active', 'premium'),
    (102, 'inactive', 'basic'),
    (103, 'active', 'basic');

-- 5. 创建用于 Identifier 类型测试的表
CREATE TABLE IF NOT EXISTS sales_2024 (
    value UInt32
) ENGINE = Memory;

INSERT INTO sales_2024 VALUES (100), (200), (300);
SET param_name = 'John Doe';
SET param_age = 25;
SET param_salary = 75000.50;

SELECT name, age, salary FROM users
WHERE name = {name: String}
  AND age >= {age: UInt8}
  AND salary <= {salary: Float64};

关于在语言客户端中使用查询参数,请参考相应语言客户端的文档。

查询参数的限制

查询参数不是通用的文本替换机制。它们有特定的限制:

  1. 它们主要面向 SELECT 语句——对 SELECT 查询的支持最佳
  2. 它们只能作为标识符或字面量使用——不能替换任意 SQL 片段
  3. 它们对 DDL 的支持有限——在 CREATE TABLE 中受支持,但在 ALTER TABLE 中不受支持

可以正常工作的用法:

-- ✓ WHERE 子句中的值
SELECT * FROM users WHERE id = {user_id: UInt64};

-- ✓ 表名/数据库名
SELECT * FROM {db: Identifier}.{table: Identifier};

-- ✓ IN 子句中的值
SELECT * FROM products WHERE id IN {ids: Array(UInt32)};

-- ✓ CREATE TABLE 语句
CREATE TABLE {table_name: Identifier} (id UInt64, name String) ENGINE = MergeTree() ORDER BY id;

不支持的内容:

-- ✗ SELECT 中的列名(请谨慎使用 Identifier)
SELECT {column: Identifier} FROM users;  -- 支持有限

-- ✗ 任意 SQL 片段
SELECT * FROM users {where_clause: String};  -- 不支持

-- ✗ ALTER TABLE 语句
ALTER TABLE {table: Identifier} ADD COLUMN new_col String;  -- 不支持

-- ✗ 多条语句
{statements: String};  -- 不支持

安全最佳实践

对所有用户输入一律使用查询参数:

# ✓ 安全 - 使用参数
user_input = request.get('user_id')
result = client.query(
    "SELECT * FROM orders WHERE user_id = {uid: UInt64}",
    parameters={'uid': user_input}
)

# ✗ 危险 - 存在 SQL 注入风险!
user_input = request.get('user_id')
result = client.query(f"SELECT * FROM orders WHERE user_id = {user_input}")

校验输入类型:

def get_user_orders(user_id: int, start_date: str):
    # 查询前验证类型
    if not isinstance(user_id, int) or user_id <= 0:
        raise ValueError("user_id 无效")

    # 参数确保类型安全
    return client.query(
        """
        SELECT * FROM orders
        WHERE user_id = {uid: UInt64}
            AND order_date >= {start: Date}
        """,
        parameters={'uid': user_id, 'start': start_date}
    )

MySQL 协议预处理语句

ClickHouse 的 MySQL 接口 对预处理语句(COM_STMT_PREPARECOM_STMT_EXECUTECOM_STMT_CLOSE)仅提供有限支持,主要用于兼容诸如 Tableau Online 这类会将查询包装为预处理语句的工具,以便建立连接。

主要限制:

  • 不支持参数绑定 - 不能使用带绑定参数的 ? 占位符
  • 查询在 PREPARE 期间会被存储,但不会被解析
  • 实现非常精简,仅用于特定 BI 工具的兼容性场景

以下示例是不可行的用法:

-- 这种 MySQL 风格的带参数预处理语句在 ClickHouse 中无法使用
PREPARE stmt FROM 'SELECT * FROM users WHERE id = ?';
EXECUTE stmt USING @user_id;  -- 不支持参数绑定
提示

请改用 ClickHouse 的原生查询参数。 它们在所有 ClickHouse 接口中都提供完善的参数绑定支持、类型安全以及 SQL 注入防护:

-- ClickHouse 原生查询参数(推荐)
SET param_user_id = 12345;
SELECT * FROM users WHERE id = {user_id: UInt64};

有关更多详情,请参阅 MySQL 接口文档关于 MySQL 支持的博客文章

总结

ClickHouse 中用于替代存储过程的方案

传统存储过程模式ClickHouse 中的替代方案
简单计算和转换用户自定义函数(UDF)
可复用的参数化查询参数化视图
预计算聚合物化视图
定时批处理可刷新的物化视图
复杂的多步骤 ETL 流程链式物化视图或外部编排(Python、Airflow、dbt)
带有控制流的业务逻辑应用程序代码

查询参数的使用

查询参数可用于:

  • 防止 SQL 注入
  • 类型安全的参数化查询
  • 在应用中进行动态过滤
  • 可重用的查询模板