Apache NiFiをClickHouseに接続する
Apache NiFiは、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフロー管理ソフトウェアです。ETLデータパイプラインの作成が可能で、300以上のデータプロセッサが付属しています。このステップバイステップのチュートリアルでは、Apache NiFiをClickHouseにソース及びデスティネーションとして接続し、サンプルデータセットをロードする方法を示します。
1. 接続情報を収集する
To connect to ClickHouse with HTTP(S) you need this information:
-
The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.
-
The DATABASE NAME: out of the box, there is a database named
default
, use the name of the database that you want to connect to. -
The USERNAME and PASSWORD: out of the box, the username is
default
. Use the username appropriate for your use case.
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl
command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
ClickHouseにHTTP(S)で接続するには、次の情報が必要です:
-
HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。
-
DATABASE NAME: デフォルトでは、
default
という名前のデータベースがあります。接続したいデータベースの名前を使用してください。 -
USERNAMEとPASSWORD: デフォルトでは、ユーザー名は
default
です。ご利用のケースに適したユーザー名を使用してください。
ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurl
コマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。
2. Apache NiFiをダウンロードして実行する
- 新しいセットアップの場合、https://nifi.apache.org/download.html からバイナリをダウンロードし、
./bin/nifi.sh start
を実行して開始します。
3. ClickHouse JDBCドライバをダウンロードする
- GitHubのClickHouse JDBCドライバリリースページにアクセスし、最新のJDBCリリースバージョンを探します。
- リリースバージョン内で「すべてのxxアセットを表示」をクリックし、「shaded」または「all」というキーワードを含むJARファイルを探します。例えば、
clickhouse-jdbc-0.5.0-all.jar
のようなファイルです。 - JARファイルをApache NiFiがアクセスできるフォルダに置き、絶対パスをメモします。
4. DBCPConnectionPool
コントローラサービスを追加し、そのプロパティを設定する
-
Apache NiFiでコントローラサービスを設定するには、「ギア」ボタンをクリックしてNiFiフロー設定ページに移動します。
-
コントローラスサービスタブを選択し、右上の
+
ボタンをクリックして新しいコントローラサービスを追加します。 -
DBCPConnectionPool
を検索し、「追加」ボタンをクリックします。 -
新しく追加された
DBCPConnectionPool
はデフォルトで無効な状態です。「ギア」ボタンをクリックして設定を開始します。 -
「プロパティ」セクションで、以下の値を入力します。
プロパティ | 値 | 備考 |
---|---|---|
データベース接続URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 接続URL内のHOSTNAMEを適宜置き換えてください |
データベースドライバクラス名 | com.clickhouse.jdbc.ClickHouseDriver | |
データベースドライバの場所 | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBCドライバJARファイルの絶対パス |
データベースユーザー | default | ClickHouseのユーザー名 |
パスワード | password | ClickHouseのパスワード |
-
設定セクションで、コントローラサービスの名前を「ClickHouse JDBC」に変更して簡単に参照できるようにします。
-
「雷」ボタンをクリックし、「有効にする」ボタンをクリックして
DBCPConnectionPool
コントローラサービスを有効にします。
-
コントローラスサービスタブを確認し、コントローラサービスが有効になっていることを確認します。
5. ExecuteSQL
プロセッサを使用してテーブルから読み込む
-
適切なアップストリームおよびダウンストリームプロセッサと共に
ExecuteSQL
プロセッサを追加します。 -
ExecuteSQL
プロセッサの「プロパティ」セクションで、以下の値を入力します。プロパティ 値 備考 データベース接続プーリングサービス ClickHouse JDBC ClickHouseのために設定されたコントローラサービスを選択します SQL選択クエリ SELECT * FROM system.metrics ここにクエリを入力します -
ExecuteSQL
プロセッサを起動します。 -
クエリが正常に処理されたことを確認するために、出力キュー内の
FlowFile
の1つを検査します。 -
結果を見るためにビューを「フォーマット」に切り替えます。
6. MergeRecord
とPutDatabaseRecord
プロセッサを使用してテーブルに書き込む
-
複数の行を単一の挿入で書き込むために、まず複数のレコードを単一のレコードにマージする必要があります。これは
MergeRecord
プロセッサを使用して行えます。 -
MergeRecord
プロセッサの「プロパティ」セクションで、以下の値を入力します。プロパティ 値 備考 レコードリーダー JSONTreeReader
適切なレコードリーダーを選択します レコードライター JSONReadSetWriter
適切なレコードライターを選択します 最小レコード数 1000 この値を高く変更して、単一のレコードを形成するための最小行数をマージします。デフォルトは1行です 最大レコード数 10000 「最小レコード数」よりも高い数字に変更します。デフォルトは1,000行です -
複数のレコードがひとつにマージされていることを確認するために、
MergeRecord
プロセッサの入力と出力を検査します。出力が複数の入力レコードの配列であることに注目してください。入力
出力
-
PutDatabaseRecord
プロセッサの「プロパティ」セクションで、以下の値を入力します。プロパティ 値 備考 レコードリーダー JSONTreeReader
適切なレコードリーダーを選択します データベースタイプ Generic デフォルトのままにします ステートメントタイプ INSERT データベース接続プーリングサービス ClickHouse JDBC ClickHouseコントローラサービスを選択します テーブル名 tbl ここにテーブル名を入力します フィールド名の変換 false フィールド名を挿入する際にカラム名と一致するように「false」に設定します 最大バッチサイズ 1000 挿入ごとの最大行数。この値は MergeRecord
プロセッサの「最小レコード数」よりも低く設定しないでください。 -
各挿入が複数の行を含んでいることを確認するために、テーブルの行数が
MergeRecord
で定義された「最小レコード数」に関連して少なくとも増加していることを確認します。 -
おめでとうございます - Apache NiFiを使用してClickHouseにデータを正常にロードしました!