跳到主要内容
跳到主要内容

将 Apache NiFi 连接到 ClickHouse

Community Maintained

Apache NiFi 是一种开源的工作流管理软件,旨在自动化软件系统之间的数据流。它允许创建 ETL 数据管道,并配备了超过 300 种数据处理器。本逐步教程展示了如何将 Apache NiFi 连接到 ClickHouse 作为数据源和目标,并加载一个示例数据集。

1. 收集连接详情

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

  • 主机端口:通常,当使用 TLS 时端口为 8443,当不使用 TLS 时端口为 8123。

  • 数据库名称:开箱即用时,有一个名为 default 的数据库,请使用您要连接的数据库名称。

  • 用户名密码:开箱即用时,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中获得。 选择您要连接的服务并点击 连接

ClickHouse Cloud 服务连接按钮

选择 HTTPS,详细信息可在示例的 curl 命令中获得。

ClickHouse Cloud HTTPS 连接详细信息

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

2. 下载并运行 Apache NiFi

  1. 对于新的设置,从 https://nifi.apache.org/download.html 下载二进制文件,然后通过运行 ./bin/nifi.sh start 来启动

3. 下载 ClickHouse JDBC 驱动程序

  1. 访问 ClickHouse JDBC 驱动程序发布页面,查找最新的 JDBC 发布版本
  2. 在发布版本中,点击“显示所有 xx 资产”,查找包含关键词“shaded”或“all”的 JAR 文件,例如 clickhouse-jdbc-0.5.0-all.jar
  3. 将 JAR 文件放在 Apache NiFi 可访问的文件夹中,并记录其绝对路径

4. 添加 DBCPConnectionPool 控制服务并配置其属性

  1. 要在 Apache NiFi 中配置控制服务,请点击“齿轮”按钮访问 NiFi 流配置页面

    NiFi 流配置页面,齿轮按钮突出显示
  2. 选择控制服务选项卡,通过点击右上角的 + 按钮添加新控制服务

    控制服务选项卡,添加按钮突出显示
  3. 搜索 DBCPConnectionPool 并点击“添加”按钮

    控制服务选择对话框,`DBCPConnectionPool` 突出显示
  4. 新添加的 DBCPConnectionPool 默认处于无效状态。点击“齿轮”按钮开始配置

    控制服务列表,显示无效的 `DBCPConnectionPool`,齿轮按钮突出显示
  5. 在“属性”部分,输入以下值

属性备注
数据库连接 URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true将连接 URL 中的 HOSTNAME 替换为相应的主机名
数据库驱动类名com.clickhouse.jdbc.ClickHouseDriver
数据库驱动位置/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC 驱动程序 JAR 文件的绝对路径
数据库用户defaultClickHouse 用户名
密码passwordClickHouse 密码
  1. 在设置部分,将控制服务的名称更改为“ClickHouse JDBC”,以便于引用

    DBCPConnectionPool 配置对话框,显示已填写的属性
  2. 通过点击“闪电”按钮然后点击“启用”按钮来激活 DBCPConnectionPool 控制服务

    控制服务列表,闪电按钮突出显示

    启用控制服务确认对话框
  3. 检查控制服务选项卡,确保控制服务已启用

    控制服务列表,显示启用的 ClickHouse JDBC 服务

5. 使用 ExecuteSQL 处理器从表中读取

  1. 添加一个 ​​ExecuteSQL 处理器,并添加适当的上游和下游处理器

    NiFi 画布,显示工作流中的 ExecuteSQL 处理器
  2. 在 ​​ExecuteSQL 处理器的“属性”部分,输入以下值

    属性备注
    数据库连接池服务ClickHouse JDBC选择配置好的 ClickHouse 控制服务
    SQL 选择查询SELECT * FROM system.metrics在此输入您的查询
  3. 启动 ​​ExecuteSQL 处理器

    ExecuteSQL 处理器配置,显示已填写的属性
  4. 为确认查询已成功处理,检查输出队列中的一个 FlowFile

    列表队列对话框,显示待检查的 FlowFile
  5. 切换视图为“格式化”以查看输出 FlowFile 的结果

    FlowFile 内容查看器,显示格式化查看中的查询结果

6. 使用 MergeRecordPutDatabaseRecord 处理器向表中写入

  1. 要在单次插入中写入多行,我们首先需要将多条记录合并为一条记录。这可以通过使用 MergeRecord 处理器来完成。

  2. MergeRecord 处理器的“属性”部分,输入以下值

    属性备注
    记录读取器JSONTreeReader选择合适的记录读取器
    记录写入器JSONReadSetWriter选择合适的记录写入器
    最小记录数1000更改为更高的数字,以便合并形成单条记录的最小行数。默认为 1 行
    最大记录数10000更改为高于“最小记录数”的值。默认为 1,000 行
  3. 要确认多条记录已合并为一条,检查 MergeRecord 处理器的输入和输出。请注意,输出是一个包含多个输入记录的数组

    输入

    MergeRecord 处理器输入,显示单个记录

    输出

    MergeRecord 处理器输出,显示合并的记录数组
  4. PutDatabaseRecord 处理器的“属性”部分,输入以下值

    属性备注
    记录读取器JSONTreeReader选择合适的记录读取器
    数据库类型Generic保持默认
    语句类型INSERT
    数据库连接池服务ClickHouse JDBC选择 ClickHouse 控制服务
    表名tbl在此输入您的表名
    转换字段名称false设置为“false”,以便插入的字段名称必须与列名匹配
    最大批处理大小1000每次插入的最大行数。此值不得低于 MergeRecord 处理器中“最小记录数”的值
  5. 要确认每次插入包含多行,请检查表中的行计数是否至少按 MergeRecord 中定义的“最小记录数”递增。

    查询结果,显示目标表中的行计数
  6. 恭喜 - 您已经成功通过 Apache NiFi 将数据加载到 ClickHouse 中!