Azure Data Factory で ClickHouse の HTTP インターフェイスを使用する
azureBlobStorage Table Function
は、Azure Blob Storage から ClickHouse へデータを高速かつ便利に取り込むための手段です。
しかし、次のような理由から、常に適しているとは限りません:
- データが Azure Blob Storage に保存されていない場合があります。たとえば、Azure SQL Database、Microsoft SQL Server、Cosmos DB に保存されている場合です。
- セキュリティポリシーにより Blob Storage への外部アクセスが完全に禁止されている場合があります。たとえば、ストレージアカウントにパブリックエンドポイントがなく、ロックダウンされている場合です。
このようなシナリオでは、Azure Data Factory と ClickHouse HTTP interface を組み合わせて使用し、Azure の各種サービスから ClickHouse へデータを送信できます。
この方法ではデータフローの向きが逆になります。ClickHouse が Azure からデータを取得するのではなく、 Azure Data Factory が ClickHouse にデータをプッシュします。このアプローチでは、 一般的に ClickHouse インスタンスがパブリックインターネットからアクセス可能である必要があります。
Azure Data Factory の Self-hosted Integration Runtime を使用すれば、 ClickHouse インスタンスをインターネットに公開せずに済みます。この構成により、 プライベートネットワーク経由でデータを送信できます。ただし、このトピックは本記事の範囲外です。 詳細については公式ガイドを参照してください: Create and configure a self-hosted integration runtime
ClickHouse を REST サービスとして利用する
Azure Data Factory は、HTTP 経由で JSON 形式のデータを外部システムへ送信することをサポートしています。この機能を利用して、ClickHouse HTTP interface を使い、ClickHouse に直接データを投入できます。 詳細については、ClickHouse HTTP インターフェイスのドキュメント を参照してください。
この例では、宛先テーブルを指定し、入力データ形式として JSON を指定し、さらにタイムスタンプをより柔軟にパースできるようにするためのオプションを含めるだけで済みます。
このクエリを HTTP リクエストの一部として送信するには、URL エンコードした文字列として ClickHouse エンドポイントの query パラメーターに渡すだけです。
Azure Data Factory は組み込みの encodeUriComponent 関数を使ってこのエンコーディングを自動的に処理できるため、手動で行う必要はありません。
これで、この URL に JSON 形式のデータを送信できるようになります。データは対象テーブルの構造に対応している必要があります。次は、col_1、col_2、col_3 の 3 つのカラムを持つテーブルを想定した、curl を使った簡単な例です。
オブジェクトの JSON 配列、あるいは JSON Lines(改行区切りの JSON オブジェクト)を送信することもできます。Azure Data Factory は JSON 配列形式を使用しており、ClickHouse の JSONEachRow 入力形式と完全に互換性があります。
ご覧のとおり、この手順では ClickHouse 側で特別な作業を行う必要はありません。HTTP インターフェイスが、REST 風のエンドポイントとして動作するために必要なものをすべてすでに提供しており、追加の設定は不要です。
ClickHouse を REST エンドポイントのように動作させられたので、次は Azure Data Factory をそれを利用するように構成します。
次の手順では、Azure Data Factory インスタンスを作成し、ClickHouse インスタンスへの Linked Service をセットアップし、 REST sink 用の Dataset を定義し、 Azure から ClickHouse にデータを送信する Copy Data アクティビティを作成します。
Azure Data Factory インスタンスの作成
このガイドでは、Microsoft Azure アカウントにアクセスでき、サブスクリプションとリソースグループがすでに構成されていることを前提とします。すでに Azure Data Factory が構成済みの場合は、この手順は省略し、既存のサービスを使用して次の手順に進んでかまいません。
-
Microsoft Azure Portal にログインし、Create a resource をクリックします。

-
左側のカテゴリ ペインで Analytics を選択し、人気のサービス一覧から Data Factory をクリックします。

-
サブスクリプションとリソースグループを選択し、新しい Data Factory インスタンスの名前を入力して、リージョンを選択し、バージョンは V2 のままにします。

-
Review + Create をクリックし、続いて Create をクリックしてデプロイを開始します。


デプロイが正常に完了したら、新しい Azure Data Factory インスタンスの使用を開始できます。
新しい REST ベースの Linked service を作成する
-
Microsoft Azure Portal にログインし、Data Factory インスタンスを開きます。

-
Data Factory の概要ページで Launch Studio をクリックします。

-
左側のメニューで Manage を選択し、Linked services に移動して、 新しい Linked service を作成するために + New をクリックします。

-
New linked service search bar で REST と入力し、REST を選択して Continue をクリックし、 REST connector インスタンスを作成します。

-
Linked service の設定ペインで新しいサービスの名前を入力し、 Base URL フィールドをクリックしてから Add dynamic content をクリックします (このリンクはフィールドを選択したときにのみ表示されます)。

-
Dynamic content ペインでは、URL をパラメーター化できます。 これにより、異なるテーブル向けのデータセットを作成する際にクエリを後から定義できるため、 Linked service を再利用可能にできます。

-
フィルター入力欄の横にある "+" をクリックして新しいパラメーターを追加し、 名前を
pQuery、型を String、既定値をSELECT 1に設定します。 Save をクリックします。
-
Expression フィールドに次の内容を入力し、OK をクリックします。
your-clickhouse-url.comを実際の ClickHouse インスタンスのアドレスに置き換えてください。
-
メインフォームに戻り、Basic authentication を選択し、 ClickHouse HTTP インターフェイスへの接続に使用するユーザー名と パスワードを入力して、Test connection をクリックします。 すべて正しく構成されていれば、成功メッセージが表示されます。

-
Create をクリックしてセットアップを完了します。

これで、新しく登録した REST ベースの Linked service が一覧に表示されるようになります。
ClickHouse HTTP インターフェイス用の新しいデータセットを作成する
ClickHouse HTTP インターフェイス用のリンク サービスを構成したので、 Azure Data Factory が ClickHouse にデータを送信する際に使用する データセットを作成します。
この例では、Environmental Sensors Data の一部を挿入します。
-
任意の ClickHouse クエリコンソールを開きます。これは、 ClickHouse Cloud の Web UI、CLI クライアント、またはクエリを実行するために 使用しているその他のインターフェイスのいずれでも構いません。次にターゲットテーブルを作成します。
-
Azure Data Factory Studio で、左側のペインから Author を選択します。 Dataset 項目にカーソルを合わせて三点リーダーのアイコンをクリックし、New dataset を選択します。

-
検索バーで REST と入力し、REST を選択して Continue をクリックします。 データセット名を入力し、前の手順で作成した linked service を選択します。 OK をクリックしてデータセットを作成します。

-
左側の Factory Resources ペインの Datasets セクションに、 作成したデータセットが表示されているはずです。データセットを選択して プロパティを開きます。リンク サービスで定義した
pQueryパラメータが表示されます。 Value テキストフィールドをクリックし、続けて Add dynamic content をクリックします。
-
開いたペインに、次のクエリを貼り付けます。
危険クエリ内のすべての単一引用符
'は、2 つの単一引用符''に置き換える必要があります。 これは Azure Data Factory の式パーサーの要件です。エスケープしなかった場合、 すぐにはエラーが表示されないかもしれませんが、データセットを使用または保存しようとしたときに 後で失敗します。たとえば、'best_effort'は''best_effort''と記述しなければなりません。
-
OK をクリックして式を保存します。次に Test connection をクリックします。 すべてが正しく構成されていれば、Connection successful というメッセージが表示されます。 ページ上部の Publish all をクリックして、変更内容を保存します。

サンプルデータセットの準備
この例では、Environmental Sensors Dataset 全体ではなく、 Sensors Dataset Sample で提供されている小規模なサブセットのみを使用します。
このガイドの焦点を絞るため、Azure Data Factory でソースデータセットを作成するための 詳細な手順には踏み込みません。サンプルデータは、任意のストレージサービスにアップロードできます。 たとえば、Azure Blob Storage、Microsoft SQL Server、あるいは Azure Data Factory でサポートされている 別のファイル形式でも構いません。
データセットを Azure Blob Storage(または他に選択したストレージサービス)にアップロードします。 次に、Azure Data Factory Studio で Factory Resources ペインを開きます。 アップロードしたデータを参照する新しいデータセットを作成します。最後に「Publish all」をクリックして、 変更内容を公開します。
ClickHouse にデータを転送する Copy Activity の作成
入力および出力データセットの両方を構成したので、サンプルデータセットから
ClickHouse の sensors テーブルにデータを転送する Copy Data アクティビティを
設定します。
-
Azure Data Factory Studio を開き、Author タブ に移動します。Factory Resources ペインで Pipeline の上にマウスカーソルを置き、三点アイコンをクリックして New pipeline を選択します。

-
Activities ペインで Move and transform セクションを展開し、Copy data アクティビティをキャンバス上にドラッグします。

-
Source タブを選択し、先ほど作成したソースデータセットを選択します。

-
Sink タブに移動し、
sensorsテーブル用に作成した ClickHouse データセットを 選択します。Request method を POST に設定します。HTTP compression type が None になっていることを確認します。注意Azure Data Factory の Copy Data アクティビティでは HTTP 圧縮が正しく動作しません。 有効化すると、Azure はゼロバイトのみから成るペイロードを送信します — サービス側の バグと思われます。必ず圧縮は無効のままにしてください。
情報デフォルトのバッチサイズ 10,000 を維持するか、さらに増やすことを推奨します。詳細は Selecting an Insert Strategy / Batch inserts if synchronous を参照してください。

-
キャンバス上部の Debug をクリックしてパイプラインを実行します。しばらく待つと、 アクティビティはキューに投入されて実行されます。すべてが正しく構成されていれば、 タスクは Success ステータスで終了します。

-
完了したら Publish all をクリックし、パイプラインとデータセットの変更を保存します。