跳转到主内容
跳转到主内容

将 Apify 连接到 ClickHouse

Community Maintained

Apify 是一个网页抓取与自动化平台。您可以构建、运行和扩展名为 Actors 的无服务器云程序。Actors 可用于抓取网站、爬取网络内容、处理数据或自动化工作流。每次 Actor 运行都会生成结构化输出,并将其存储在 Datasets (JSON 对象集合) 中。

将抓取或处理后的数据加载到 ClickHouse 中,用于分析、监控或数据富化管道。

关键概念

Apify 概念它是什么
Actor在 Apify 平台上运行的无服务器云程序。在 Apify Store 中可找到数千个现成的 Actor。
DatasetActor 运行的输出结果。它是类似表格的 JSON 对象集合,可通过 Apify API 以 JSON、CSV、XML 或其他格式获取。
Webhook一种事件驱动的 HTTP 调用,会在 Actor 运行成功、失败或发生其他生命周期事件时触发。使用 webhook 可自动化 Apify 到 ClickHouse 的管道。

设置指南

收集您的 ClickHouse 连接信息

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

参数说明
HOSTPORT通常,在使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。
DATABASE NAME默认提供一个名为 default 的数据库,请填写您要连接的目标数据库名称。
USERNAMEPASSWORD默认用户名为 default。请使用适合您使用场景的用户名。

您的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中查看。 选择某个服务并点击 Connect

ClickHouse Cloud 服务 Connect 按钮

选择 HTTPS。连接信息会显示在示例 curl 命令中。

ClickHouse Cloud HTTPS 连接信息

如果您使用的是自托管 ClickHouse,则连接信息由您的 ClickHouse 管理员进行设置。

Apify 前置条件

您还需要:

安装依赖

安装 Apify JavaScript 客户端和 ClickHouse JavaScript 客户端:

npm install apify-client @clickhouse/client
注意

Apify 还提供 Python 客户端。如果您更倾向于使用 Python,请通过 pip 安装 apify-client,并为 ClickHouse 使用 clickhouse-connect

在 ClickHouse 中创建目标表

创建一个表来存放抓取到的数据。schema 取决于您使用的 Actor。以下示例针对一个商品抓取 Actor 使用 MergeTree

CREATE TABLE apify_products
(
    url        String,
    title      String,
    price      Float64,
    currency   String,
    scraped_at DateTime DEFAULT now()
)
ENGINE = MergeTree()
ORDER BY (scraped_at, url);

拉取 Apify 数据集并加载到 ClickHouse

以下脚本会拉取 Apify Actor 一次运行的结果,并将其插入 ClickHouse:

import { ApifyClient } from 'apify-client';
import { createClient } from '@clickhouse/client';

// 初始化客户端
const apify = new ApifyClient({ token: 'YOUR_APIFY_API_TOKEN' });
const clickhouse = createClient({
    url: 'https://YOUR_CLICKHOUSE_HOST:8443',
    username: 'default',
    password: 'YOUR_CLICKHOUSE_PASSWORD',
    database: 'default',
});

// 从某个 Actor 的最近一次运行中拉取数据集条目
const run = await apify.actor('YOUR_ACTOR_ID').call();
const { items } = await apify.dataset(run.defaultDatasetId).listItems();

console.log(`Fetched ${items.length} items from Apify dataset.`);

// 插入到 ClickHouse
await clickhouse.insert({
    table: 'apify_products',
    values: items,
    format: 'JSONEachRow',
});

console.log(`Inserted ${items.length} rows into ClickHouse.`);
await clickhouse.close();
提示

对于大型数据集,请使用 List dataset items 端点的 limitoffset 参数对结果进行分页。您还可以传递 clean=true,以仅获取非空且已去重的条目。

使用 webhook 实现自动化

与其手动运行脚本,不如将管道自动化,以便每次 Actor 完成后都将数据加载到 ClickHouse:

  1. Apify Console 中,进入您的 Actor 并打开 集成 选项卡。
  2. 添加一个新的 webhook,配置如下:
    • 事件类型: ACTOR.RUN.SUCCEEDED
    • 操作: 向您的加载器端点发送 HTTP POST,或触发另一个负责将数据插入 ClickHouse 的 Actor。
  3. webhook 负载中包含 defaultDatasetId,您可以用它来拉取该次运行的结果。

有关负载详情和配置选项,请参阅 Apify webhook 文档

另一种方法是使用 Apify Schedules 按类似 cron 的计划运行 Actor,并结合 webhook 完成加载步骤。

最佳实践

从 Apify 拉取数据

使用 Apify 客户端库 (JavaScript 使用 apify-client,或 Python) ,而不要直接发起原始 HTTP 请求。它会为你处理分页、重试和身份验证。对于大型数据集,请使用 List dataset items 接口中的 limitoffset 参数对结果进行分页。

加载到 ClickHouse

向 ClickHouse 插入数据时,请使用 JSONEachRow 格式。它与 Apify 的 JSON 输出可直接对应,无需任何转换。

确保 ClickHouse 表的 schema 与 Actor 的输出字段一致。可在其 Apify Store 页面或运行后的 Dataset 选项卡中查看 Actor 的输出 schema。

性能

对于通过 JavaScript 客户端进行的高处理量插入,请参考性能优化提示。应将多行数据合并为更大的批次插入,而不是一次插入一行;如果不便在客户端侧进行批处理,请考虑使用异步插入

安全性

为简化说明,本页中的示例使用 default 用户和数据库。在生产环境中,请创建一个专用用户,并仅授予其向目标表执行插入操作所需的最小特权,同时安全地存储凭证 (例如存放在环境变量或密钥管理器中,而不是提交到源代码中) 。相关指引请参见 云访问管理