メインコンテンツへスキップ
メインコンテンツへスキップ

Apache NiFiをClickHouseに接続する

Community Maintained

Apache NiFi

は、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフロー管理ソフトウェアです。ETLデータパイプラインの作成が可能で、300以上のデータプロセッサが同梱されています。このステップバイステップのチュートリアルでは、Apache NiFiをソースと宛先の両方としてClickHouseに接続し、サンプルデータセットをロードする方法を説明します。

接続情報を収集する

To connect to ClickHouse with HTTP(S) you need this information:

Parameter(s)Description
HOST and PORTTypically, the port is 8443 when using TLS or 8123 when not using TLS.
DATABASE NAMEOut of the box, there is a database named default, use the name of the database that you want to connect to.
USERNAME and PASSWORDOut of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS. Connection details are displayed in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.

Apache NiFi のダウンロードと実行

新規セットアップを行う場合は、https://nifi.apache.org/download.html からバイナリをダウンロードし、./bin/nifi.sh start を実行して起動します。

ClickHouse JDBC ドライバーをダウンロードする

  1. GitHub の ClickHouse JDBC ドライバーのリリースページ にアクセスし、最新の JDBC リリースバージョンを確認します
  2. 対象のリリースバージョンで「Show all xx assets」をクリックし、「shaded」または「all」というキーワードを含む JAR ファイルを探します(例: clickhouse-jdbc-0.5.0-all.jar
  3. JAR ファイルを Apache NiFi からアクセス可能なフォルダに配置し、その絶対パスを控えておきます

DBCPConnectionPool コントローラサービスを追加し、プロパティを設定する

  1. Apache NiFi でコントローラサービスを設定するには、歯車アイコン("gear" ボタン)をクリックして NiFi Flow Configuration ページを開きます

    歯車ボタンが強調表示された NiFi Flow Configuration ページ
  2. Controller Services タブを選択し、右上の + ボタンをクリックして新しいコントローラサービスを追加します

    追加ボタンが強調表示された Controller Services タブ
  3. DBCPConnectionPool を検索し、「Add」ボタンをクリックします

    DBCPConnectionPool が強調表示された Controller Service 選択ダイアログ
  4. 追加したばかりの DBCPConnectionPool は、デフォルトでは無効 (Invalid) 状態になっています。歯車アイコン("gear" ボタン)をクリックして設定を開始します

    Invalid 状態の DBCPConnectionPool と gear ボタンが強調表示された Controller Services 一覧
  5. 「Properties」セクションで、次の値を入力します

PropertyValueRemark
Database Connection URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true接続 URL 中の HOSTNAME を環境に合わせて置き換えます
Database Driver Class Namecom.clickhouse.jdbc.ClickHouseDriver
Database Driver Location(s)/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC ドライバ JAR ファイルへの絶対パス
Database UserdefaultClickHouse ユーザー名
PasswordpasswordClickHouse パスワード
  1. Settings セクションで、コントローラサービスの名前を分かりやすくするために「ClickHouse JDBC」に変更します

    プロパティが入力された DBCPConnectionPool 設定ダイアログ
  2. 「lightning」ボタンをクリックし、続いて「Enable」ボタンをクリックして DBCPConnectionPool コントローラサービスを有効化します

    lightning ボタンが強調表示された Controller Services 一覧

    Controller Service を有効化する確認ダイアログ
  3. Controller Services タブを開き、コントローラサービスが有効化されていることを確認します

    有効化された ClickHouse JDBC サービスが表示されている Controller Services 一覧

ExecuteSQL プロセッサを使用してテーブルから読み取る

  1. 適切なアップストリームおよびダウンストリームのプロセッサと共に、ExecuteSQL プロセッサを追加します

    ExecuteSQL プロセッサを含むワークフローを表示している NiFi キャンバス
  2. ExecuteSQL プロセッサの「Properties」セクションで、次の値を入力します

    PropertyValueRemark
    Database Connection Pooling ServiceClickHouse JDBCClickHouse 用に設定した Controller Service を選択します
    SQL select querySELECT * FROM system.metricsここにクエリを入力します
  3. ExecuteSQL プロセッサを開始します

    プロパティが入力された ExecuteSQL プロセッサの設定画面
  4. クエリが正常に処理されたことを確認するため、出力キュー内の FlowFile の 1 つを開いて確認します

    検査可能な FlowFile が表示されているキュー一覧ダイアログ
  5. 表示を「formatted」に切り替えて、出力された FlowFile の結果を表示します

    整形ビューでクエリ結果を表示している FlowFile コンテンツビューア

MergeRecordPutDatabaseRecordプロセッサを使用してテーブルに書き込む

  1. 単一のINSERT文で複数行を書き込むには、まず複数のレコードを1つのレコードにマージする必要があります。これはMergeRecordプロセッサを使用して実行できます

  2. MergeRecordプロセッサの「Properties」セクションで、以下の値を入力してください

    PropertyValueRemark
    Record ReaderJSONTreeReader適切なレコードリーダーを選択してください
    Record WriterJSONReadSetWriter適切なレコードライターを選択してください
    Minimum Number of Records1000最小行数をマージして1つのレコードを形成するように、この値をより大きな数値に変更してください。デフォルトは1行です
    Maximum Number of Records10000「Minimum Number of Records」よりも大きな数値に変更してください。デフォルトは1,000行です
  3. 複数のレコードが1つにマージされていることを確認するには、MergeRecordプロセッサの入力と出力を確認してください。出力は複数の入力レコードの配列であることに注意してください

    入力

    単一レコードを示すMergeRecordプロセッサの入力

    出力

    マージされたレコード配列を示すMergeRecordプロセッサの出力
  4. PutDatabaseRecordプロセッサの「Properties」セクションで、以下の値を入力してください

    PropertyValueRemark
    Record ReaderJSONTreeReader適切なレコードリーダーを選択してください
    Database TypeGenericデフォルトのままにしてください
    Statement TypeINSERT
    Database Connection Pooling ServiceClickHouse JDBCClickHouseコントローラーサービスを選択してください
    Table Nametblここにテーブル名を入力してください
    Translate Field Namesfalse挿入されるフィールド名がカラム名と一致する必要があるように「false」に設定してください
    Maximum Batch Size1000INSERT文あたりの最大行数。この値はMergeRecordプロセッサの「Minimum Number of Records」の値より小さくしないでください
  5. 各INSERT文に複数行が含まれていることを確認するには、テーブルの行数がMergeRecordで定義された「Minimum Number of Records」の値以上ずつ増加していることを確認してください。

    宛先テーブルの行数を示すクエリ結果
  6. おめでとうございます - Apache NiFiを使用してClickHouseへのデータ読み込みに成功しました!