Подключение Apache NiFi к ClickHouse
Apache NiFi — это программное обеспечение для управления рабочими процессами с открытым исходным кодом, предназначенное для автоматизации потоков данных между программными системами. Оно позволяет создавать ETL конвейеры данных и поставляется с более чем 300 процессорами данных. Этот поэтапный учебник показывает, как подключить Apache NiFi к ClickHouse как источник и приемник, а также загрузить образец набора данных.
1. Соберите ваши данные для подключения
Чтобы подключиться к ClickHouse с помощью HTTP(S), вам нужна следующая информация:
-
ХОСТ и ПОРТ: обычно порт 8443 при использовании TLS или 8123 при его отсутствии.
-
НАЗВАНИЕ БАЗЫ ДАННЫХ: по умолчанию есть база данных с именем
default
, используйте имя базы данных, к которой вы хотите подключиться. -
ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя
default
. Используйте имя пользователя, подходящее для вашего случая использования.
Данные для вашей службы ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите службу, к которой вы будете подключаться, и нажмите Подключиться:

Выберите HTTPS, и детали будут доступны в примере команды curl
.

Если вы используете self-managed ClickHouse, детали подключения задаются вашим администратором ClickHouse.
2. Скачайте и запустите Apache NiFi
- Для новой настройки, скачайте бинарный файл с https://nifi.apache.org/download.html и начните, запустив
./bin/nifi.sh start
3. Скачайте драйвер JDBC для ClickHouse
- Посетите страницу релиза драйвера JDBC для ClickHouse на GitHub и найдите последнюю версию релиза JDBC.
- В версии релиза нажмите "Показать все xx объектов" и найдите JAR файл, содержащий ключевое слово "shaded" или "all", например,
clickhouse-jdbc-0.5.0-all.jar
. - Поместите JAR файл в папку, доступную для Apache NiFi, и запомните абсолютный путь.
4. Добавьте сервис контроллера DBCPConnectionPool
и настройте его свойства
-
Чтобы настроить сервис контроллера в Apache NiFi, посетите страницу конфигурации потока NiFi, нажав на кнопку "шестеренка".
-
Выберите вкладку Сервисы контроллеров и добавьте новый сервис контроллера, нажав на кнопку
+
в верхнем правом углу. -
Найдите
DBCPConnectionPool
и нажмите кнопку "Добавить". -
Новый
DBCPConnectionPool
будет иметь статус "Неверно" по умолчанию. Нажмите на кнопку "шестеренка", чтобы начать настройку. -
В разделе "Свойства" введите следующие значения.
Свойство | Значение | Примечание |
---|---|---|
URL соединения с базой данных | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | Замените HOSTNAME в URL соединения соответственно |
Имя класса драйвера базы данных | com.clickhouse.jdbc.ClickHouseDriver | |
Расположение драйвера базы данных | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | Абсолютный путь к JAR файлу драйвера JDBC для ClickHouse |
Пользователь базы данных | default | Имя пользователя ClickHouse |
Пароль | password | Пароль ClickHouse |
-
В разделе "Настройки" измените имя сервиса контроллера на "ClickHouse JDBC" для удобства ссылок.
-
Активируйте сервис контроллера
DBCPConnectionPool
, нажав кнопку "молния", а затем кнопку "Включить".
-
Проверьте вкладку Сервисы контроллеров и убедитесь, что сервис контроллера включен.
5. Чтение из таблицы с помощью процессора ExecuteSQL
-
Добавьте процессор
ExecuteSQL
, вместе с соответствующими предшествующими и последующими процессорами. -
В разделе "Свойства" процессора
ExecuteSQL
введите следующие значения.Свойство Значение Примечание Сервис пула соединений с базой данных ClickHouse JDBC Выберите сервис контроллера, настроенный для ClickHouse SQL запрос выборки SELECT * FROM system.metrics Введите здесь ваш запрос -
Запустите процессор
ExecuteSQL
. -
Чтобы подтвердить, что запрос обработан успешно, проверьте один из
FlowFile
в выходной очереди. -
Переключите просмотр на "форматированный", чтобы увидеть результат выходного
FlowFile
.
6. Запись в таблицу с помощью процессоров MergeRecord
и PutDatabaseRecord
-
Чтобы записать несколько строк в одной вставке, сначала необходимо объединить несколько записей в одну. Это можно сделать с помощью процессора
MergeRecord
. -
В разделе "Свойства" процессора
MergeRecord
введите следующие значения.Свойство Значение Примечание Читатель записей JSONTreeReader
Выберите подходящий читатель записей Писатель записей JSONReadSetWriter
Выберите подходящий писатель записей Минимальное количество записей 1000 Измените это значение на большее, чтобы минимальное количество строк объединялось в одну запись. По умолчанию 1 строка Максимальное количество записей 10000 Измените это значение на большее, чем "Минимальное количество записей". По умолчанию 1,000 строк -
Чтобы подтвердить, что несколько записей объединены в одну, проверьте ввод и вывод процессора
MergeRecord
. Обратите внимание, что вывод — это массив из нескольких входных записей.Вход
Выход
-
В разделе "Свойства" процессора
PutDatabaseRecord
введите следующие значения.Свойство Значение Примечание Читатель записей JSONTreeReader
Выберите подходящий читатель записей Тип базы данных Generic Оставьте по умолчанию Тип оператора INSERT Сервис пула соединений с базой данных ClickHouse JDBC Выберите сервис контроллера ClickHouse Имя таблицы tbl Введите сюда имя вашей таблицы Преобразовать имена полей false Установите в "false", чтобы имена полей, которые вставляются, соответствовали именам колонок Максимальный размер пакета 1000 Максимальное количество строк на вставку. Это значение не должно быть ниже значения "Минимальное количество записей" в процессоре MergeRecord
. -
Чтобы подтвердить, что каждая вставка содержит несколько строк, проверьте, что количество строк в таблице увеличивается как минимум на значение "Минимальное количество записей", определенное в
MergeRecord
. -
Поздравляем - вы успешно загрузили свои данные в ClickHouse с помощью Apache NiFi!