连接 Apache NiFi 到 ClickHouse
Apache NiFi 是一个开源工作流管理软件,旨在自动化软件系统之间的数据流。它允许创建 ETL 数据管道并带有超过 300 种数据处理器。本分步教程展示了如何将 Apache NiFi 连接到 ClickHouse,作为源和目的地,并加载示例数据集。
1. 收集连接详情
要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:
-
HOST 和 PORT:通常,当使用 TLS 时,端口为 8443;当不使用 TLS 时,端口为 8123。
-
数据库名称:开箱即用时,有一个名为
default
的数据库,请使用您要连接的数据库名称。 -
用户名和密码:开箱即用时,用户名为
default
。请使用适合您用例的用户名。
您的 ClickHouse Cloud 服务详细信息可在 ClickHouse Cloud 控制台中找到。 选择您要连接的服务并点击 Connect:

选择 HTTPS,详细信息会在示例 curl
命令中提供。

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。
2. 下载并运行 Apache NiFi
- 对于新设置,从 https://nifi.apache.org/download.html 下载二进制文件,并通过运行
./bin/nifi.sh start
开始。
3. 下载 ClickHouse JDBC 驱动程序
- 访问 ClickHouse JDBC 驱动程序发布页面,并查找最新的 JDBC 版本。
- 在发布版本中,点击“显示所有 xx 个资产”,并寻找包含关键字“shaded”或“all”的 JAR 文件,例如
clickhouse-jdbc-0.5.0-all.jar
。 - 将 JAR 文件放置在 Apache NiFi 可访问的文件夹中,并记下绝对路径。
4. 添加 DBCPConnectionPool
控制服务并配置其属性
-
要在 Apache NiFi 中配置控制服务,请点击“齿轮”按钮访问 NiFi 流配置页面。
-
选择控制服务选项卡,点击右上角的
+
按钮添加新的控制服务。 -
搜索
DBCPConnectionPool
并点击“添加”按钮。 -
新添加的
DBCPConnectionPool
默认处于无效状态。点击“齿轮”按钮开始配置。 -
在“属性”部分,输入以下值:
属性 | 值 | 备注 |
---|---|---|
数据库连接 URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 相应地替换连接 URL 中的 HOSTNAME |
数据库驱动类名 | com.clickhouse.jdbc.ClickHouseDriver | |
数据库驱动位置 | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBC 驱动程序 JAR 文件的绝对路径 |
数据库用户 | default | ClickHouse 用户名 |
密码 | password | ClickHouse 密码 |
-
在设置部分,将控制服务的名称更改为“ClickHouse JDBC”,以便于引用。
-
点击“闪电”按钮并再点击“启用”按钮来激活
DBCPConnectionPool
控制服务。
-
检查控制服务选项卡,确保控制服务已启用。
5. 使用 ExecuteSQL
处理器从表中读取数据
-
添加一个
ExecuteSQL
处理器,以及适当的上游和下游处理器。 -
在
ExecuteSQL
处理器的“属性”部分,输入以下值:属性 值 备注 数据库连接池服务 ClickHouse JDBC 选择为 ClickHouse 配置的控制服务 SQL 查询 SELECT * FROM system.metrics 在此输入你的查询 -
启动
ExecuteSQL
处理器。 -
要确认查询已经成功处理,请检查输出队列中的一个
FlowFile
。 -
切换为“格式化”视图,以查看输出
FlowFile
的结果。
6. 使用 MergeRecord
和 PutDatabaseRecord
处理器向表中写入数据
-
为了在单个插入中写入多行,我们首先需要将多个记录合并为一个记录。这可以使用
MergeRecord
处理器完成。 -
在
MergeRecord
处理器的“属性”部分,输入以下值:属性 值 备注 记录读取器 JSONTreeReader
选择适当的记录读取器 记录写入器 JSONReadSetWriter
选择适当的记录写入器 最小记录数 1000 将其更改为更高的数字,以便最小行数合并为一条记录。默认值为 1 行 最大记录数 10000 将其更改为大于“最小记录数”的更高数字。默认值为 1,000 行 -
要确认多个记录已合并为一个记录,请检查
MergeRecord
处理器的输入和输出。请注意,输出是多个输入记录的数组。输入
输出
-
在
PutDatabaseRecord
处理器的“属性”部分,输入以下值:属性 值 备注 记录读取器 JSONTreeReader
选择适当的记录读取器 数据库类型 Generic 保持默认值 语句类型 INSERT 数据库连接池服务 ClickHouse JDBC 选择 ClickHouse 控制服务 表名 tbl 在此输入你的表名 翻译字段名 false 设置为“false”,以便插入的字段名必须与列名匹配 最大批大小 1000 每次插入的最大行数。该值应不低于 MergeRecord
处理器中“最小记录数”的值。 -
为了确认每个插入包含多行,请检查表中的行数是否至少增加了
MergeRecord
中定义的“最小记录数”。 -
恭喜你 - 你已经成功使用 Apache NiFi 将数据加载到 ClickHouse 中!