メインコンテンツまでスキップ
メインコンテンツまでスキップ

Apache NiFi を ClickHouse に接続する

Apache NiFi は、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフローマネジメントソフトウェアです。 ETL データパイプラインの作成を可能にし、300 以上のデータプロセッサが付属しています。このステップバイステップのチュートリアルでは、Apache NiFi を ClickHouse にソースおよびデスティネーションとして接続し、サンプルデータセットをロードする方法を示します。

1. 接続情報を収集する

ClickHouseにHTTP(S)で接続するには、次の情報が必要です:

  • HOST と PORT: 通常、ポートはTLSを使用する場合は8443、TLSを使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用してください。

  • USERNAME と PASSWORD: デフォルトでは、ユーザー名はdefaultです。あなたのユースケースに適したユーザー名を使用してください。

あなたのClickHouse Cloudサービスの詳細はClickHouse Cloudコンソールで確認できます。接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細は例のcurlコマンドに表示されます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合、接続の詳細はあなたのClickHouse管理者によって設定されています。

2. Apache NiFi をダウンロードし、実行する

  1. 新しいセットアップの場合、https://nifi.apache.org/download.html からバイナリをダウンロードし、./bin/nifi.sh start を実行して開始します。

3. ClickHouse JDBC ドライバーをダウンロードする

  1. ClickHouse JDBC ドライバーリリースページ にアクセスし、最新の JDBC リリースバージョンを探します。
  2. リリースバージョン内で、「すべての xx アセットを表示」をクリックし、「shaded」または「all」を含む JAR ファイルを探します。例えば、clickhouse-jdbc-0.5.0-all.jar などです。
  3. JAR ファイルを Apache NiFi がアクセスできるフォルダーに配置し、絶対パスをメモしておきます。

4. DBCPConnectionPool コントローラーサービスを追加し、そのプロパティを設定する

  1. Apache NiFi でコントローラーサービスを設定するには、「ギア」ボタンをクリックして NiFi フローメンテナンスページに移動します。

    NiFi フロー設定
  2. 「コントローラーサービス」タブを選択し、右上の + ボタンをクリックして新しいコントローラーサービスを追加します。

    コントローラーサービスの追加
  3. DBCPConnectionPool を検索し、「追加」ボタンをクリックします。

    ​`DBCPConnectionPool` を検索
  4. 新しく追加された DBCPConnectionPool はデフォルトで無効の状態になります。「ギア」ボタンをクリックして設定を開始します。

    NiFi フロー設定
  5. 「プロパティ」セクションで、以下の値を入力します。

プロパティ備考
データベース接続 URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true接続 URL の HOSTNAME を適宜置き換えます
データベースドライバー クラス名com.clickhouse.jdbc.ClickHouseDriver
データベースドライバーの場所/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC ドライバー JAR ファイルへの絶対パス
データベースユーザーdefaultClickHouse ユーザー名
パスワードpasswordClickHouse パスワード
  1. 設定セクションで、コントローラーサービスの名前を「ClickHouse JDBC」に変更して簡単に参照できるようにします。

    NiFi フロー設定
  2. 「稲妻」ボタンをクリックしてから「有効」ボタンをクリックし、DBCPConnectionPool コントローラーサービスをアクティブにします。

    NiFi フロー設定
    NiFi フロー設定
  3. 「コントローラーサービス」タブで、コントローラーサービスが有効になっていることを確認します。

    NiFi フロー設定

5. ExecuteSQL プロセッサを使用してテーブルから読み取る

  1. ​ExecuteSQL プロセッサを追加し、適切な上流および下流プロセッサを整えます。

    ​`ExecuteSQL` プロセッサ
  2. ​ExecuteSQL プロセッサの「プロパティ」セクションに、以下の値を入力します。

    プロパティ備考
    データベース接続プーリングサービスClickHouse JDBCClickHouse 用に設定されたコントローラーサービスを選択
    SQL セレクトクエリSELECT * FROM system.metricsここにクエリを入力します
  3. ​​ExecuteSQL プロセッサを起動します。

    `​​ExecuteSQL` プロ��セッサ
  4. クエリが正常に処理されたことを確認するには、出力キューの FlowFile の 1 つを検査します。

    ​`​ExecuteSQL` プロセッサ
  5. 「フォーマット」でビューを切り替えて、出力 FlowFile の結果を表示します。

    `​​ExecuteSQL` プロセッサ

6. MergeRecordPutDatabaseRecord プロセッサを使用してテーブルに書き込む

  1. 単一の挿入で複数の行を書くためには、最初に複数のレコードを 1 つのレコードに統合する必要があります。これは、MergeRecord プロセッサを使用して行うことができます。

  2. MergeRecord プロセッサの「プロパティ」セクションに、以下の値を入力します。

    プロパティ備考
    レコードリーダーJSONTreeReader適切なレコードリーダーを選択
    レコードライターJSONReadSetWriter適切なレコードライターを選択
    最小レコード数1000これを高い数字に変更して、最小行数が統合されて 1 つのレコードを形成するようにします。デフォルトは 1 行です。
    最大レコード数10000「最小レコード数」よりも大きい数字に変更します。デフォルトは 1,000 行です。
  3. 複数のレコードが 1 つに統合されたことを確認するために、MergeRecord プロセッサの入力と出力を検査します。出力は複数の入力レコードの配列であることに注意してください。

    入力

    ​`​ExecuteSQL` プロセッサ

    出力

    ​`​ExecuteSQL` プロセッサ
  4. PutDatabaseRecord プロセッサの「プロパティ」セクションに、以下の値を入力します。

    プロパティ備考
    レコードリーダーJSONTreeReader適切なレコードリーダーを選択
    データベースタイプGenericデフォルトのままにします
    ステートメントタイプINSERT
    データベース接続プーリングサービスClickHouse JDBCClickHouse コントローラーサービスを選択
    テーブル名tblここにテーブル名を入力します
    フィールド名の翻訳falseフィールド名がカラム名と一致している必要があるため、「false」に設定します
    最大バッチサイズ10001 回の挿入あたりの最大行数。この値は MergeRecord プロセッサの「最小レコード数」よりも少なくしてはいけません
  5. 各挿入に複数の行が含まれていることを確認するためには、テーブル内の行数が MergeRecord で定義した「最小レコード数」以上で増加していることを確認します。

    `​​ExecuteSQL` プロセッサ
  6. おめでとうございます! Apache NiFi を使用して ClickHouse にデータを正常にロードしました!