跳到主要内容
跳到主要内容

连接 Apache NiFi 到 ClickHouse

Apache NiFi 是一个开源工作流管理软件,旨在自动化软件系统之间的数据流。它允许创建 ETL 数据管道并带有超过 300 种数据处理器。本分步教程展示了如何将 Apache NiFi 连接到 ClickHouse,作为源和目的地,并加载示例数据集。

1. 收集连接详情

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

  • HOST 和 PORT:通常,当使用 TLS 时,端口为 8443;当不使用 TLS 时,端口为 8123。

  • 数据库名称:开箱即用时,有一个名为 default 的数据库,请使用您要连接的数据库名称。

  • 用户名和密码:开箱即用时,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务详细信息可在 ClickHouse Cloud 控制台中找到。 选择您要连接的服务并点击 Connect

ClickHouse Cloud 服务连接按钮

选择 HTTPS,详细信息会在示例 curl 命令中提供。

ClickHouse Cloud HTTPS 连接详细信息

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

2. 下载并运行 Apache NiFi

  1. 对于新设置,从 https://nifi.apache.org/download.html 下载二进制文件,并通过运行 ./bin/nifi.sh start 开始。

3. 下载 ClickHouse JDBC 驱动程序

  1. 访问 ClickHouse JDBC 驱动程序发布页面,并查找最新的 JDBC 版本。
  2. 在发布版本中,点击“显示所有 xx 个资产”,并寻找包含关键字“shaded”或“all”的 JAR 文件,例如 clickhouse-jdbc-0.5.0-all.jar
  3. 将 JAR 文件放置在 Apache NiFi 可访问的文件夹中,并记下绝对路径。

4. 添加 DBCPConnectionPool 控制服务并配置其属性

  1. 要在 Apache NiFi 中配置控制服务,请点击“齿轮”按钮访问 NiFi 流配置页面。

    NiFi 流配置
  2. 选择控制服务选项卡,点击右上角的 + 按钮添加新的控制服务。

    添加控制服务
  3. 搜索 DBCPConnectionPool 并点击“添加”按钮。

    搜索 `DBCPConnectionPool`
  4. 新添加的 DBCPConnectionPool 默认处于无效状态。点击“齿轮”按钮开始配置。

    NiFi 流配置
  5. 在“属性”部分,输入以下值:

属性备注
数据库连接 URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true相应地替换连接 URL 中的 HOSTNAME
数据库驱动类名com.clickhouse.jdbc.ClickHouseDriver
数据库驱动位置/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC 驱动程序 JAR 文件的绝对路径
数据库用户defaultClickHouse 用户名
密码passwordClickHouse 密码
  1. 在设置部分,将控制服务的名称更改为“ClickHouse JDBC”,以便于引用。

    NiFi 流配置
  2. 点击“闪电”按钮并再点击“启用”按钮来激活 DBCPConnectionPool 控制服务。

    NiFi 流配置
    NiFi 流配置
  3. 检查控制服务选项卡,确保控制服务已启用。

    NiFi 流配置

5. 使用 ExecuteSQL 处理器从表中读取数据

  1. 添加一个 ExecuteSQL 处理器,以及适当的上游和下游处理器。

    `ExecuteSQL` 处理器
  2. ExecuteSQL 处理器的“属性”部分,输入以下值:

    属性备注
    数据库连接池服务ClickHouse JDBC选择为 ClickHouse 配置的控制服务
    SQL 查询SELECT * FROM system.metrics在此输入你的查询
  3. 启动 ExecuteSQL 处理器。

    `ExecuteSQL` 处理器
  4. 要确认查询已经成功处理,请检查输出队列中的一个 FlowFile

    `ExecuteSQL` 处理器
  5. 切换为“格式化”视图,以查看输出 FlowFile 的结果。

    `ExecuteSQL` 处理器

6. 使用 MergeRecordPutDatabaseRecord 处理器向表中写入数据

  1. 为了在单个插入中写入多行,我们首先需要将多个记录合并为一个记录。这可以使用 MergeRecord 处理器完成。

  2. MergeRecord 处理器的“属性”部分,输入以下值:

    属性备注
    记录读取器JSONTreeReader选择适当的记录读取器
    记录写入器JSONReadSetWriter选择适当的记录写入器
    最小记录数1000将其更改为更高的数字,以便最小行数合并为一条记录。默认值为 1 行
    最大记录数10000将其更改为大于“最小记录数”的更高数字。默认值为 1,000 行
  3. 要确认多个记录已合并为一个记录,请检查 MergeRecord 处理器的输入和输出。请注意,输出是多个输入记录的数组。

    输入

    `ExecuteSQL` 处理器

    输出

    `ExecuteSQL` 处理器
  4. PutDatabaseRecord 处理器的“属性”部分,输入以下值:

    属性备注
    记录读取器JSONTreeReader选择适当的记录读取器
    数据库类型Generic保持默认值
    语句类型INSERT
    数据库连接池服务ClickHouse JDBC选择 ClickHouse 控制服务
    表名tbl在此输入你的表名
    翻译字段名false设置为“false”,以便插入的字段名必须与列名匹配
    最大批大小1000每次插入的最大行数。该值应不低于 MergeRecord 处理器中“最小记录数”的值。
  5. 为了确认每个插入包含多行,请检查表中的行数是否至少增加了 MergeRecord 中定义的“最小记录数”。

    `ExecuteSQL` 处理器
  6. 恭喜你 - 你已经成功使用 Apache NiFi 将数据加载到 ClickHouse 中!