将 Apache NiFi 连接到 ClickHouse
Apache NiFi 是一种开源的工作流管理软件,旨在自动化软件系统之间的数据流。它允许创建 ETL 数据管道,并配备了超过 300 种数据处理器。本逐步教程展示了如何将 Apache NiFi 连接到 ClickHouse 作为数据源和目标,并加载一个示例数据集。
1. 收集连接详情
要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:
- 
主机和端口:通常,当使用 TLS 时端口为 8443,当不使用 TLS 时端口为 8123。 
- 
数据库名称:开箱即用时,有一个名为 default的数据库,请使用您要连接的数据库名称。
- 
用户名和密码:开箱即用时,用户名为 default。请使用适合您用例的用户名。
您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中获得。 选择您要连接的服务并点击 连接:

选择 HTTPS,详细信息可在示例的 curl 命令中获得。

如果您使用的是自管理的 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。
2. 下载并运行 Apache NiFi
- 对于新的设置,从 https://nifi.apache.org/download.html 下载二进制文件,然后通过运行 ./bin/nifi.sh start来启动
3. 下载 ClickHouse JDBC 驱动程序
- 访问 ClickHouse JDBC 驱动程序发布页面,查找最新的 JDBC 发布版本
- 在发布版本中,点击“显示所有 xx 资产”,查找包含关键词“shaded”或“all”的 JAR 文件,例如 clickhouse-jdbc-0.5.0-all.jar
- 将 JAR 文件放在 Apache NiFi 可访问的文件夹中,并记录其绝对路径
4. 添加 DBCPConnectionPool 控制服务并配置其属性
- 
要在 Apache NiFi 中配置控制服务,请点击“齿轮”按钮访问 NiFi 流配置页面  
- 
选择控制服务选项卡,通过点击右上角的 +按钮添加新控制服务 
- 
搜索 DBCPConnectionPool并点击“添加”按钮 
- 
新添加的 DBCPConnectionPool默认处于无效状态。点击“齿轮”按钮开始配置 
- 
在“属性”部分,输入以下值 
| 属性 | 值 | 备注 | 
|---|---|---|
| 数据库连接 URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 将连接 URL 中的 HOSTNAME 替换为相应的主机名 | 
| 数据库驱动类名 | com.clickhouse.jdbc.ClickHouseDriver | |
| 数据库驱动位置 | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBC 驱动程序 JAR 文件的绝对路径 | 
| 数据库用户 | default | ClickHouse 用户名 | 
| 密码 | password | ClickHouse 密码 | 
- 
在设置部分,将控制服务的名称更改为“ClickHouse JDBC”,以便于引用  
- 
通过点击“闪电”按钮然后点击“启用”按钮来激活 DBCPConnectionPool控制服务 
  
- 
检查控制服务选项卡,确保控制服务已启用  
5. 使用 ExecuteSQL 处理器从表中读取
- 
添加一个  ExecuteSQL处理器,并添加适当的上游和下游处理器 
- 
在  ExecuteSQL处理器的“属性”部分,输入以下值属性 值 备注 数据库连接池服务 ClickHouse JDBC 选择配置好的 ClickHouse 控制服务 SQL 选择查询 SELECT * FROM system.metrics 在此输入您的查询 
- 
启动 ExecuteSQL处理器 
- 
为确认查询已成功处理,检查输出队列中的一个 FlowFile 
- 
切换视图为“格式化”以查看输出 FlowFile的结果 
6. 使用 MergeRecord 和 PutDatabaseRecord 处理器向表中写入
- 
要在单次插入中写入多行,我们首先需要将多条记录合并为一条记录。这可以通过使用 MergeRecord处理器来完成。
- 
在 MergeRecord处理器的“属性”部分,输入以下值属性 值 备注 记录读取器 JSONTreeReader选择合适的记录读取器 记录写入器 JSONReadSetWriter选择合适的记录写入器 最小记录数 1000 更改为更高的数字,以便合并形成单条记录的最小行数。默认为 1 行 最大记录数 10000 更改为高于“最小记录数”的值。默认为 1,000 行 
- 
要确认多条记录已合并为一条,检查 MergeRecord处理器的输入和输出。请注意,输出是一个包含多个输入记录的数组输入  输出  
- 
在 PutDatabaseRecord处理器的“属性”部分,输入以下值属性 值 备注 记录读取器 JSONTreeReader选择合适的记录读取器 数据库类型 Generic 保持默认 语句类型 INSERT 数据库连接池服务 ClickHouse JDBC 选择 ClickHouse 控制服务 表名 tbl 在此输入您的表名 转换字段名称 false 设置为“false”,以便插入的字段名称必须与列名匹配 最大批处理大小 1000 每次插入的最大行数。此值不得低于 MergeRecord处理器中“最小记录数”的值
- 
要确认每次插入包含多行,请检查表中的行计数是否至少按 MergeRecord中定义的“最小记录数”递增。 
- 
恭喜 - 您已经成功通过 Apache NiFi 将数据加载到 ClickHouse 中! 
