Connect Apache NiFi to ClickHouse
Apache NiFi は、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフローマネジメントソフトウェアです。ETLデータパイプラインの作成を可能にし、300以上のデータプロセッサが付属しています。このステップバイステップのチュートリアルでは、Apache NiFiをClickHouseにソースおよび宛先として接続し、サンプルデータセットをロードする方法を示します。
1. 接続情報を収集する
To connect to ClickHouse with HTTP(S) you need this information:
-
The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.
-
The DATABASE NAME: out of the box, there is a database named
default
, use the name of the database that you want to connect to. -
The USERNAME and PASSWORD: out of the box, the username is
default
. Use the username appropriate for your use case.
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl
command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
以下は、ClickHouseにHTTP(S)で接続するために必要な情報です:
-
HOSTとPORT: 通常、TLSを使用する場合はポートが8443、使用しない場合は8123です。
-
DATABASE NAME: デフォルトでは、
default
という名前のデータベースがあります。接続したいデータベースの名前を使用します。 -
USERNAMEとPASSWORD: デフォルトでは、ユーザー名は
default
です。使用ケースに適したユーザー名を使用します。
ClickHouse Cloudサービスの詳細は、ClickHouse Cloudコンソールで確認できます。 接続するサービスを選択し、Connectをクリックしてください:

HTTPSを選択すると、詳細はexample curl
コマンドで確認できます。

セルフマネージドのClickHouseを使用している場合は、接続の詳細がClickHouse管理者によって設定されます。
2. Apache NiFiをダウンロードして実行する
- 新しいセットアップの場合、https://nifi.apache.org/download.html からバイナリをダウンロードし、
./bin/nifi.sh start
を実行して開始します。
3. ClickHouse JDBCドライバをダウンロードする
- GitHubのClickHouse JDBCドライバリリースページにアクセスし、最新のJDBCリリースバージョンを探します。
- リリースバージョンで「Show all xx assets」をクリックし、「shaded」または「all」というキーワードを含むJARファイルを探します。例えば、
clickhouse-jdbc-0.5.0-all.jar
です。 - JARファイルをApache NiFiがアクセスできるフォルダに置き、絶対パスをメモします。
4. DBCPConnectionPool
コントローラーサービスを追加し、そのプロパティを設定する
-
Apache NiFiでコントローラーサービスを設定するには、「ギア」ボタンをクリックしてNiFiフロー設定ページにアクセスします。
-
コントローラーサービスタブを選択し、右上の
+
ボタンをクリックして新しいコントローラーサービスを追加します。 -
DBCPConnectionPool
を検索し、「Add」ボタンをクリックします。 -
新しく追加した
DBCPConnectionPool
はデフォルトで無効の状態です。設定を開始するには「ギア」ボタンをクリックします。 -
「プロパティ」セクションに次の値を入力します。
Property | Value | Remark |
---|---|---|
Database Connection URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 接続URLのHOSTNAMEを適宜置き換えます。 |
Database Driver Class Name | com.clickhouse.jdbc.ClickHouseDriver | |
Database Driver Location(s) | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBCドライバJARファイルへの絶対パス |
Database User | default | ClickHouseのユーザー名 |
Password | password | ClickHouseのパスワード |
-
設定セクションで、コントローラーサービスの名前を「ClickHouse JDBC」に変更し、簡単に参照できるようにします。
-
「稲妻」ボタンをクリックして
DBCPConnectionPool
コントローラーサービスを有効化し、その後「Enable」ボタンをクリックします。
-
コントローラーサービスタブをチェックし、コントローラーサービスが有効になっていることを確認します。
5. ExecuteSQL
プロセッサを使用してテーブルから読み込む
-
ExecuteSQL
プロセッサと適切な上流および下流プロセッサを追加します。 -
ExecuteSQL
プロセッサの「プロパティ」セクションに次の値を入力します。Property Value Remark Database Connection Pooling Service ClickHouse JDBC ClickHouse用に設定したコントローラーサービスを選択します。 SQL select query SELECT * FROM system.metrics ここにクエリを入力します。 -
ExecuteSQL
プロセッサを開始します。 -
「FlowFile」の出力キューの1つを調べて、クエリが正常に処理されたことを確認します。
-
結果を表示するには、「formatted」にビューを切り替えます。
6. MergeRecord
およびPutDatabaseRecord
プロセッサを使用してテーブルに書き込む
-
単一の挿入で複数の行を書き込むために、最初に複数のレコードを単一のレコードにマージする必要があります。これには
MergeRecord
プロセッサを使用します。 -
MergeRecord
プロセッサの「プロパティ」セクションに次の値を入力します。Property Value Remark Record Reader JSONTreeReader
適切なレコードリーダーを選択します。 Record Writer JSONReadSetWriter
適切なレコードライターを選択します。 Minimum Number of Records 1000 これを高い数値に変更し、単一のレコードを形成するためにマージされる最小行数を確保します。デフォルトは1行です。 Maximum Number of Records 10000 「Minimum Number of Records」よりも高い数値に変更します。デフォルトは1,000行です。 -
複数のレコードが1つにマージされたことを確認するために、
MergeRecord
プロセッサの入力と出力を確認します。出力は複数の入力レコードの配列であることに注意してください。入力
出力
-
PutDatabaseRecord
プロセッサの「プロパティ」セクションに次の値を入力します。Property Value Remark Record Reader JSONTreeReader
適切なレコードリーダーを選択します。 Database Type Generic デフォルトのままにします。 Statement Type INSERT Database Connection Pooling Service ClickHouse JDBC ClickHouseコントローラーサービスを選択します。 Table Name tbl ここにテーブル名を入力します。 Translate Field Names false フィールド名がカラム名と一致するようにするために「false」に設定します。 Maximum Batch Size 1000 挿入ごとの最大行数。この値は、 MergeRecord
プロセッサの「Minimum Number of Records」の値より低くしないでください。 -
各挿入が複数の行を含んでいることを確認するために、テーブル内の行数が「Minimum Number of Records」で定義された値以上に増加していることを確認します。
-
おめでとうございます - Apache NiFiを使用してClickHouseにデータを正常にロードしました!