メインコンテンツまでスキップ
メインコンテンツまでスキップ

Connect Apache NiFi to ClickHouse

Community Maintained

Apache NiFi は、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフローマネジメントソフトウェアです。ETLデータパイプラインの作成を可能にし、300以上のデータプロセッサが付属しています。このステップバイステップのチュートリアルでは、Apache NiFiをClickHouseにソースおよび宛先として接続し、サンプルデータセットをロードする方法を示します。

1. 接続情報を収集する

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS, and the details are available in an example curl command.

ClickHouse Cloud HTTPS connection details

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.


以下は、ClickHouseにHTTP(S)で接続するために必要な情報です:

  • HOSTとPORT: 通常、TLSを使用する場合はポートが8443、使用しない場合は8123です。

  • DATABASE NAME: デフォルトでは、defaultという名前のデータベースがあります。接続したいデータベースの名前を使用します。

  • USERNAMEとPASSWORD: デフォルトでは、ユーザー名はdefaultです。使用ケースに適したユーザー名を使用します。

ClickHouse Cloudサービスの詳細は、ClickHouse Cloudコンソールで確認できます。 接続するサービスを選択し、Connectをクリックしてください:

ClickHouse Cloud service connect button

HTTPSを選択すると、詳細はexample curlコマンドで確認できます。

ClickHouse Cloud HTTPS connection details

セルフマネージドのClickHouseを使用している場合は、接続の詳細がClickHouse管理者によって設定されます。

2. Apache NiFiをダウンロードして実行する

  1. 新しいセットアップの場合、https://nifi.apache.org/download.html からバイナリをダウンロードし、./bin/nifi.sh startを実行して開始します。

3. ClickHouse JDBCドライバをダウンロードする

  1. GitHubのClickHouse JDBCドライバリリースページにアクセスし、最新のJDBCリリースバージョンを探します。
  2. リリースバージョンで「Show all xx assets」をクリックし、「shaded」または「all」というキーワードを含むJARファイルを探します。例えば、clickhouse-jdbc-0.5.0-all.jarです。
  3. JARファイルをApache NiFiがアクセスできるフォルダに置き、絶対パスをメモします。

4. DBCPConnectionPoolコントローラーサービスを追加し、そのプロパティを設定する

  1. Apache NiFiでコントローラーサービスを設定するには、「ギア」ボタンをクリックしてNiFiフロー設定ページにアクセスします。

    NiFi Flow Configuration page with gear button highlighted
  2. コントローラーサービスタブを選択し、右上の+ボタンをクリックして新しいコントローラーサービスを追加します。

    Controller Services tab with add button highlighted
  3. DBCPConnectionPoolを検索し、「Add」ボタンをクリックします。

    Controller Service selection dialog with DBCPConnectionPool highlighted
  4. 新しく追加したDBCPConnectionPoolはデフォルトで無効の状態です。設定を開始するには「ギア」ボタンをクリックします。

    Controller Services list showing invalid DBCPConnectionPool with gear button highlighted
  5. 「プロパティ」セクションに次の値を入力します。

PropertyValueRemark
Database Connection URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true接続URLのHOSTNAMEを適宜置き換えます。
Database Driver Class Namecom.clickhouse.jdbc.ClickHouseDriver
Database Driver Location(s)/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBCドライバJARファイルへの絶対パス
Database UserdefaultClickHouseのユーザー名
PasswordpasswordClickHouseのパスワード
  1. 設定セクションで、コントローラーサービスの名前を「ClickHouse JDBC」に変更し、簡単に参照できるようにします。

    DBCPConnectionPool configuration dialog showing properties filled in
  2. 「稲妻」ボタンをクリックしてDBCPConnectionPoolコントローラーサービスを有効化し、その後「Enable」ボタンをクリックします。

    Controller Services list with lightning button highlighted

    Enable Controller Service confirmation dialog
  3. コントローラーサービスタブをチェックし、コントローラーサービスが有効になっていることを確認します。

    Controller Services list showing enabled ClickHouse JDBC service

5. ExecuteSQLプロセッサを使用してテーブルから読み込む

  1. ​ExecuteSQLプロセッサと適切な上流および下流プロセッサを追加します。

    NiFi canvas showing ExecuteSQL processor in a workflow
  2. ​ExecuteSQLプロセッサの「プロパティ」セクションに次の値を入力します。

    PropertyValueRemark
    Database Connection Pooling ServiceClickHouse JDBCClickHouse用に設定したコントローラーサービスを選択します。
    SQL select querySELECT * FROM system.metricsここにクエリを入力します。
  3. ​​ExecuteSQLプロセッサを開始します。

    ExecuteSQL processor configuration with properties filled in
  4. 「FlowFile」の出力キューの1つを調べて、クエリが正常に処理されたことを確認します。

    List queue dialog showing flowfiles ready for inspection
  5. 結果を表示するには、「formatted」にビューを切り替えます。

    FlowFile content viewer showing query results in formatted view

6. MergeRecordおよびPutDatabaseRecordプロセッサを使用してテーブルに書き込む

  1. 単一の挿入で複数の行を書き込むために、最初に複数のレコードを単一のレコードにマージする必要があります。これにはMergeRecordプロセッサを使用します。

  2. MergeRecordプロセッサの「プロパティ」セクションに次の値を入力します。

    PropertyValueRemark
    Record ReaderJSONTreeReader適切なレコードリーダーを選択します。
    Record WriterJSONReadSetWriter適切なレコードライターを選択します。
    Minimum Number of Records1000これを高い数値に変更し、単一のレコードを形成するためにマージされる最小行数を確保します。デフォルトは1行です。
    Maximum Number of Records10000「Minimum Number of Records」よりも高い数値に変更します。デフォルトは1,000行です。
  3. 複数のレコードが1つにマージされたことを確認するために、MergeRecordプロセッサの入力と出力を確認します。出力は複数の入力レコードの配列であることに注意してください。

    入力

    MergeRecord processor input showing single records

    出力

    MergeRecord processor output showing merged array of records
  4. PutDatabaseRecordプロセッサの「プロパティ」セクションに次の値を入力します。

    PropertyValueRemark
    Record ReaderJSONTreeReader適切なレコードリーダーを選択します。
    Database TypeGenericデフォルトのままにします。
    Statement TypeINSERT
    Database Connection Pooling ServiceClickHouse JDBCClickHouseコントローラーサービスを選択します。
    Table Nametblここにテーブル名を入力します。
    Translate Field Namesfalseフィールド名がカラム名と一致するようにするために「false」に設定します。
    Maximum Batch Size1000挿入ごとの最大行数。この値は、MergeRecordプロセッサの「Minimum Number of Records」の値より低くしないでください。
  5. 各挿入が複数の行を含んでいることを確認するために、テーブル内の行数が「Minimum Number of Records」で定義された値以上に増加していることを確認します。

    Query results showing row count in the destination table
  6. おめでとうございます - Apache NiFiを使用してClickHouseにデータを正常にロードしました!