メインコンテンツまでスキップ
メインコンテンツまでスキップ

Apache NiFiをClickHouseに接続する

Community Maintained

Apache NiFiは、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフロー管理ソフトウェアです。ETLデータパイプラインの作成が可能で、300以上のデータプロセッサが付属しています。このステップバイステップのチュートリアルでは、Apache NiFiをClickHouseにソース及びデスティネーションとして接続し、サンプルデータセットをロードする方法を示します。

1. 接続情報を収集する

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOSTとPORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。ご利用のケースに適したユーザー名を使用してください。

ClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックします:

HTTPSを選択すると、詳細が例のcurlコマンドで提供されます。

セルフマネージドのClickHouseを使用している場合、接続の詳細はClickHouseの管理者によって設定されます。

2. Apache NiFiをダウンロードして実行する

  1. 新しいセットアップの場合、https://nifi.apache.org/download.html からバイナリをダウンロードし、./bin/nifi.sh startを実行して開始します。

3. ClickHouse JDBCドライバをダウンロードする

  1. GitHubのClickHouse JDBCドライバリリースページにアクセスし、最新のJDBCリリースバージョンを探します。
  2. リリースバージョン内で「すべてのxxアセットを表示」をクリックし、「shaded」または「all」というキーワードを含むJARファイルを探します。例えば、clickhouse-jdbc-0.5.0-all.jarのようなファイルです。
  3. JARファイルをApache NiFiがアクセスできるフォルダに置き、絶対パスをメモします。

4. DBCPConnectionPoolコントローラサービスを追加し、そのプロパティを設定する

  1. Apache NiFiでコントローラサービスを設定するには、「ギア」ボタンをクリックしてNiFiフロー設定ページに移動します。

  2. コントローラスサービスタブを選択し、右上の+ボタンをクリックして新しいコントローラサービスを追加します。

  3. DBCPConnectionPoolを検索し、「追加」ボタンをクリックします。

  4. 新しく追加されたDBCPConnectionPoolはデフォルトで無効な状態です。「ギア」ボタンをクリックして設定を開始します。

  5. 「プロパティ」セクションで、以下の値を入力します。

プロパティ備考
データベース接続URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true接続URL内のHOSTNAMEを適宜置き換えてください
データベースドライバクラス名com.clickhouse.jdbc.ClickHouseDriver
データベースドライバの場所/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBCドライバJARファイルの絶対パス
データベースユーザーdefaultClickHouseのユーザー名
パスワードpasswordClickHouseのパスワード
  1. 設定セクションで、コントローラサービスの名前を「ClickHouse JDBC」に変更して簡単に参照できるようにします。

  2. 「雷」ボタンをクリックし、「有効にする」ボタンをクリックしてDBCPConnectionPoolコントローラサービスを有効にします。


  3. コントローラスサービスタブを確認し、コントローラサービスが有効になっていることを確認します。

5. ExecuteSQLプロセッサを使用してテーブルから読み込む

  1. 適切なアップストリームおよびダウンストリームプロセッサと共にExecuteSQLプロセッサを追加します。

  2. ExecuteSQLプロセッサの「プロパティ」セクションで、以下の値を入力します。

    プロパティ備考
    データベース接続プーリングサービスClickHouse JDBCClickHouseのために設定されたコントローラサービスを選択します
    SQL選択クエリSELECT * FROM system.metricsここにクエリを入力します
  3. ExecuteSQLプロセッサを起動します。

  4. クエリが正常に処理されたことを確認するために、出力キュー内のFlowFileの1つを検査します。

  5. 結果を見るためにビューを「フォーマット」に切り替えます。

6. MergeRecordPutDatabaseRecordプロセッサを使用してテーブルに書き込む

  1. 複数の行を単一の挿入で書き込むために、まず複数のレコードを単一のレコードにマージする必要があります。これはMergeRecordプロセッサを使用して行えます。

  2. MergeRecordプロセッサの「プロパティ」セクションで、以下の値を入力します。

    プロパティ備考
    レコードリーダーJSONTreeReader適切なレコードリーダーを選択します
    レコードライターJSONReadSetWriter適切なレコードライターを選択します
    最小レコード数1000この値を高く変更して、単一のレコードを形成するための最小行数をマージします。デフォルトは1行です
    最大レコード数10000「最小レコード数」よりも高い数字に変更します。デフォルトは1,000行です
  3. 複数のレコードがひとつにマージされていることを確認するために、MergeRecordプロセッサの入力と出力を検査します。出力が複数の入力レコードの配列であることに注目してください。

    入力

    出力

  4. PutDatabaseRecordプロセッサの「プロパティ」セクションで、以下の値を入力します。

    プロパティ備考
    レコードリーダーJSONTreeReader適切なレコードリーダーを選択します
    データベースタイプGenericデフォルトのままにします
    ステートメントタイプINSERT
    データベース接続プーリングサービスClickHouse JDBCClickHouseコントローラサービスを選択します
    テーブル名tblここにテーブル名を入力します
    フィールド名の変換falseフィールド名を挿入する際にカラム名と一致するように「false」に設定します
    最大バッチサイズ1000挿入ごとの最大行数。この値はMergeRecordプロセッサの「最小レコード数」よりも低く設定しないでください。
  5. 各挿入が複数の行を含んでいることを確認するために、テーブルの行数がMergeRecordで定義された「最小レコード数」に関連して少なくとも増加していることを確認します。

  6. おめでとうございます - Apache NiFiを使用してClickHouseにデータを正常にロードしました!