Apache NiFi を ClickHouse に接続する
Apache NiFi は、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフローマネジメントソフトウェアです。 ETL データパイプラインの作成を可能にし、300 以上のデータプロセッサが付属しています。このステップバイステップのチュートリアルでは、Apache NiFi を ClickHouse にソースおよびデスティネーションとして接続し、サンプルデータセットをロードする方法を示します。
1. 接続情報を収集する
ClickHouseにHTTP(S)で接続するには、次の情報が必要です:
-
HOST と PORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。
-
DATABASE NAME: デフォルトでは、
default
という名前のデータベースがあります。接続したいデータベースの名前を使用してください。 -
USERNAME と PASSWORD: デフォルトでは、ユーザー名は
default
です。あなたのユースケースに適したユーザー名を使用してください。
あなたのClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックしてください:

HTTPSを選択すると、詳細は例のcurl
コマンドに表示されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はあなたのClickHouse管理者によって設定されています。
2. Apache NiFi をダウンロードし、実行する
- 新しいセットアップの場合、https://nifi.apache.org/download.html からバイナリをダウンロードし、
./bin/nifi.sh start
を実行して開始します。
3. ClickHouse JDBC ドライバーをダウンロードする
- ClickHouse JDBC ドライバーリリースページ にアクセスし、最新の JDBC リリースバージョンを探します。
- リリースバージョン内で、「すべての xx アセットを表示」をクリックし、「shaded」または「all」を含む JAR ファイルを探します。例えば、
clickhouse-jdbc-0.5.0-all.jar
などです。 - JAR ファイルを Apache NiFi がアクセスできるフォルダーに配置し、絶対パスをメモしておきます。
4. DBCPConnectionPool
コントローラーサービスを追加し、そのプロパティを設定する
-
Apache NiFi でコントローラーサービスを設定するには、「ギア」ボタンをクリックして NiFi フローメンテナンスページに移動します。
-
「コントローラーサービス」タブを選択し、右上の
+
ボタンをクリックして新しいコントローラーサービスを追加します。 -
DBCPConnectionPool
を検索し、「追加」ボタンをクリックします。 -
新しく追加された
DBCPConnectionPool
はデフォルトで無効の状態になります。「ギア」ボタンをクリックして設定を開始します。 -
「プロパティ」セクションで、以下の値を入力します。
プロパティ | 値 | 備考 |
---|---|---|
データベース接続 URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 接続 URL の HOSTNAME を適宜置き換えます |
データベースドライバー クラス名 | com.clickhouse.jdbc.ClickHouseDriver | |
データベースドライバーの場所 | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBC ドライバー JAR ファイルへの絶対パス |
データベースユーザー | default | ClickHouse ユーザー名 |
パスワード | password | ClickHouse パスワード |
-
設定セクションで、コントローラーサービスの名前を「ClickHouse JDBC」に変更して簡単に参照できるようにします。
-
「稲妻」ボタンをクリックしてから「有効」ボタンをクリックし、
DBCPConnectionPool
コントローラーサービスをアクティブにします。
-
「コントローラーサービス」タブで、コントローラーサービスが有効になっていることを確認します。
5. ExecuteSQL
プロセッサを使用してテーブルから読み取る
-
ExecuteSQL
プロセッサを追加し、適切な上流および下流プロセッサを整えます。 -
ExecuteSQL
プロセッサの「プロパティ」セクションに、以下の値を入力します。プロパティ 値 備考 データベース接続プーリングサービス ClickHouse JDBC ClickHouse 用に設定されたコントローラーサービスを選択 SQL セレクトクエリ SELECT * FROM system.metrics ここにクエリを入力します -
ExecuteSQL
プロセッサを起動します。 -
クエリが正常に処理されたことを確認するには、出力キューの
FlowFile
の 1 つを検査します。 -
「フォーマット」でビューを切り替えて、出力
FlowFile
の結果を表示します。
6. MergeRecord
と PutDatabaseRecord
プロセッサを使用してテーブルに書き込む
-
単一の挿入で複数の行を書くためには、最初に複数のレコードを 1 つのレコードに統合する必要があります。これは、
MergeRecord
プロセッサを使用して行うことができます。 -
MergeRecord
プロセッサの「プロパティ」セクションに、以下の値を入力します。プロパティ 値 備考 レコードリーダー JSONTreeReader
適切なレコードリーダーを選択 レコードライター JSONReadSetWriter
適切なレコードライターを選択 最小レコード数 1000 これを高い数字に変更して、最小行数が統合されて 1 つのレコードを形成するようにします。デフォルトは 1 行です。 最大レコード数 10000 「最小レコード数」よりも大きい数字に変更します。デフォルトは 1,000 行です。 -
複数のレコードが 1 つに統合されたことを確認するために、
MergeRecord
プロセッサの入力と出力を検査します。出力は複数の入力レコードの配列であることに注意してください。入力
出力
-
PutDatabaseRecord
プロセッサの「プロパティ」セクションに、以下の値を入力します。プロパティ 値 備考 レコードリーダー JSONTreeReader
適切なレコードリーダーを選択 データベースタイプ Generic デフォルトのままにします ステートメントタイプ INSERT データベース接続プーリングサービス ClickHouse JDBC ClickHouse コントローラーサービスを選択 テーブル名 tbl ここにテーブル名を入力します フィールド名の翻訳 false フィールド名がカラム名と一致している必要があるため、「false」に設定します 最大バッチサイズ 1000 1 回の挿入あたりの最大行数。この値は MergeRecord
プロセッサの「最小レコード数」よりも少なくしてはいけません -
各挿入に複数の行が含まれていることを確認するためには、テーブル内の行数が
MergeRecord
で定義した「最小レコード数」以上で増加していることを確認します。 -
おめでとうございます! Apache NiFi を使用して ClickHouse にデータを正常にロードしました!