メインコンテンツへスキップ
メインコンテンツへスキップ

EMQX と ClickHouse の統合

EMQX への接続

EMQX は、オープンソースの MQTT ブローカーであり、高性能なリアルタイムメッセージ処理エンジンを備え、大規模に IoT デバイス向けのイベントストリーミングを実現します。最もスケーラブルな MQTT ブローカーとして、EMQX はあらゆるデバイスを、あらゆるスケールで接続するのに役立ちます。IoT データをどこへでも移動させ、処理できます。

EMQX Cloud は、EMQ によってホストされる、IoT 分野向けの MQTT メッセージングミドルウェア製品です。世界初の完全マネージドな MQTT 5.0 クラウドメッセージングサービスとして、EMQX Cloud は運用・保守 (O&M) を一括して提供し、MQTT メッセージングサービス向けに専用の分離環境を提供します。あらゆるものがインターネットにつながる時代において、EMQX Cloud は IoT 分野向けの業界アプリケーションをすばやく構築し、IoT データを容易に収集・伝送・計算・永続化することを可能にします。

クラウドプロバイダーによって提供されるインフラストラクチャを利用することで、EMQX Cloud は世界中の数十の国と地域にサービスを提供し、5G およびあらゆるもののインターネット向けアプリケーションに対して、低コストで安全かつ信頼性の高いクラウドサービスを提供します。

クラウドインフラストラクチャコンポーネントを示す EMQX Cloud アーキテクチャ図

前提条件

  • 非常に軽量な publish/subscribe 型のメッセージング・トランスポート・プロトコルとして設計された MQTT プロトコル に精通している。
  • 大規模な IoT デバイス向けイベントストリーミングを実現するリアルタイムメッセージ処理エンジンとして、EMQX または EMQX Cloud を利用している。
  • デバイスデータを永続化するための ClickHouse Cloud インスタンスを用意している。
  • EMQX Cloud のデプロイに接続して MQTT データをパブリッシュするための MQTT クライアントテストツールとして、MQTT X を使用している。または、MQTT ブローカーへの接続が可能な他の方法を使用してもよい。

ClickHouse Cloud サービスを取得する

このセットアップでは、ClickHouse インスタンスを N. Virginia (us-east-1) の AWS 上にデプロイし、同じリージョンに EMQX Cloud インスタンスもデプロイしました。

AWS リージョン選択を表示している ClickHouse Cloud サービスのデプロイインターフェース

セットアップの過程では、接続設定にも注意を払う必要があります。本チュートリアルでは「Anywhere」を選択していますが、特定のロケーションを指定する場合は、EMQX Cloud デプロイメントから取得した NAT gateway の IP アドレスを許可リストに追加する必要があります。

IP アクセス設定を表示している ClickHouse Cloud の接続設定画面

続いて、今後使用するためにユーザー名とパスワードを保存しておきます。

ユーザー名とパスワードを表示している ClickHouse Cloud の認証情報画面

その後、稼働中の ClickHouse インスタンスが利用可能になります。「Connect」をクリックして、ClickHouse Cloud のインスタンス接続アドレスを取得します。

接続オプション付きの ClickHouse Cloud 稼働中インスタンスのダッシュボード

「Connect to SQL Console」をクリックして、EMQX Cloud との連携用のデータベースとテーブルを作成します。

ClickHouse Cloud の SQL Console インターフェース

以下の SQL 文を参照するか、実際の状況に応じて SQL を調整してください。

CREATE TABLE emqx.temp_hum
(
   client_id String,
   timestamp DateTime,
   topic String,
   temp Float32,
   hum Float32
)
ENGINE = MergeTree()
PRIMARY KEY (client_id, timestamp)
ClickHouse Cloud におけるデータベースおよびテーブル作成用 SQL クエリの実行画面

EMQX Cloud 上に MQTT サービスを作成する

EMQX Cloud 上に専用の MQTT ブローカーを作成するのは、数回クリックするだけで済みます。

アカウントを作成する

EMQX Cloud は、すべてのアカウントに対して Standard デプロイメントと Professional デプロイメントの両方に 14 日間の無料トライアルを提供しています。

EMQX Cloud sign up ページにアクセスし、EMQX Cloud を初めて使用する場合は「start free」をクリックしてアカウントを登録します。

登録フォームが表示された EMQX Cloud サインアップページ

MQTT クラスターを作成する

ログインしたら、アカウントメニューの "Cloud console" をクリックすると、新しいデプロイメントを作成するための緑色のボタンが表示されます。

デプロイメントオプションを表示する EMQX Cloud デプロイメント作成ステップ 1

このチュートリアルでは Professional デプロイメントを使用します。データ統合機能を提供しているのは Pro バージョンのみであり、これにより 1 行のコードも書かずに MQTT データを直接 ClickHouse に送信できます。

Pro バージョンを選択し、N.Virginial リージョンを選択して Create Now をクリックします。数分で、フルマネージドの MQTT ブローカーが利用できるようになります。

リージョン選択を表示する EMQX Cloud デプロイメント作成ステップ 2

次にパネルをクリックしてクラスタービューに移動します。このダッシュボードでは、MQTT ブローカーの概要を確認できます。

ブローカーのメトリクスを表示する EMQX Cloud 概要ダッシュボード

クライアント認証情報を追加する

EMQX Cloud はデフォルトで匿名接続を許可しないため、MQTT クライアントツールを使用してこのブローカーにデータを送信できるように、クライアント認証情報を追加する必要があります。

左側のメニューで「Authentication & ACL」をクリックし、サブメニューで「Authentication」をクリックします。右側の「Add」ボタンをクリックし、後で MQTT 接続に使用するユーザー名とパスワードを設定します。ここではユーザー名とパスワードとして emqxxxxxxx を使用します。

認証情報を追加するための EMQX Cloud 認証設定インターフェース

「Confirm」をクリックすると、フルマネージドの MQTT ブローカーの準備が完了します。

NAT ゲートウェイを有効化する

ClickHouse との連携を設定し始める前に、まず NAT ゲートウェイを有効にする必要があります。デフォルトでは、MQTT ブローカーはプライベート VPC 内にデプロイされており、インターネット経由でサードパーティシステムにデータを送信することはできません。

Overview ページに戻り、ページの一番下までスクロールすると NAT ゲートウェイのウィジェットが表示されます。「Subscribe」ボタンをクリックし、指示に従います。NAT Gateway は付加価値サービスですが、14 日間の無料トライアルも提供されています。

EMQX Cloud NAT ゲートウェイ設定パネル

作成が完了すると、ウィジェット内にパブリック IP アドレスが表示されます。ClickHouse Cloud のセットアップ時に「Connect from a specific location」を選択した場合は、この IP アドレスをホワイトリストに追加する必要がある点に注意してください。

EMQX Cloud と ClickHouse Cloud の統合

EMQX Cloud Data Integrations 機能は、EMQX のメッセージフローおよびデバイスイベントを処理し応答するためのルールを構成するために使用されます。Data Integrations は、明確かつ柔軟な「設定可能な」アーキテクチャソリューションを提供するだけでなく、開発プロセスを簡素化し、ユーザビリティを向上させ、業務システムと EMQX Cloud 間の結合度を低減します。また、EMQX Cloud 固有機能のカスタマイズに対して優れた基盤インフラストラクチャも提供します。

EMQX Cloud Data Integration のオプションと利用可能なコネクタを表示している画面

EMQX Cloud は、代表的なデータシステム向けに 30 を超えるネイティブ連携機能を提供しています。ClickHouse もその 1 つです。

EMQX Cloud ClickHouse Data Integration コネクタの詳細

ClickHouse リソースの作成

左側メニューの「Data Integrations」をクリックし、「View All Resources」をクリックします。Data Persistence セクションで ClickHouse を見つけるか、ClickHouse を検索します。

ClickHouse カードをクリックして新しいリソースを作成します。

  • Note: このリソースに関するメモを追加します。
  • Server address: これは ClickHouse Cloud サービスのアドレスです。ポートを忘れないようにしてください。
  • Database name: 上記の手順で作成した emqx
  • User: ClickHouse Cloud サービスに接続するためのユーザー名。
  • Key: 接続用のパスワード。
接続情報を入力する EMQX Cloud ClickHouse Resource 設定フォーム

新しいルールの作成

リソース作成時にポップアップが表示され、「New」をクリックするとルール作成ページに移動します。

EMQX は強力な rule engine を提供しており、サードパーティシステムに送信する前に、生の MQTT メッセージを変換および拡張できます。

このチュートリアルで使用するルールは次のとおりです。

SELECT
   clientid AS client_id,
   (timestamp div 1000) AS timestamp,
   topic AS topic,
   payload.temp AS temp,
   payload.hum AS hum
FROM
"temp_hum/emqx"

temp_hum/emqx トピックからメッセージを読み取り、JSON オブジェクトに client_id、topic、timestamp の情報を付与して補完します。

つまり、トピックに送信する生の JSON は次のようになります。

{"temp": 28.5, "hum": 0.68}
EMQX Cloud データインテグレーションルール作成ステップ 1(SQL クエリを表示)

SQL のテスト機能を使ってテストを実行し、結果を確認できます。

EMQX Cloud データインテグレーションルール作成ステップ 2(テスト結果を表示)

次に「NEXT」ボタンをクリックします。このステップでは、EMQX Cloud に対して、整形されたデータを ClickHouse データベースにどのように挿入するかを指定します。

レスポンスアクションを追加する

リソースが 1 つだけの場合は、「Resource」と「Action Type」を変更する必要はありません。 SQL テンプレートを設定するだけで構いません。このチュートリアルで使用している例は次のとおりです。

INSERT INTO temp_hum (client_id, timestamp, topic, temp, hum) VALUES ('${client_id}', ${timestamp}, '${topic}', ${temp}, ${hum})
SQL テン�プレートを使用した EMQX Cloud データ統合ルールアクションのセットアップ

これは ClickHouse にデータを挿入するためのテンプレートです。ここで変数がどのように利用されているか確認できます。

ルールの詳細を表示

「Confirm」と「View Details」をクリックします。これで、すべての設定が完了しているはずです。ルール詳細ページから、データ統合が正しく動作していることを確認できます。

設定サマリーを表示している EMQX Cloud データ統合ルール詳細

temp_hum/emqx トピックに送信されたすべての MQTT メッセージは、ClickHouse Cloud データベースに永続化されます。

ClickHouse へのデータ保存

温度と湿度のデータをシミュレートし、そのデータを MQTT X を介して EMQX Cloud に送信します。その後、EMQX Cloud Data Integrations を使用してデータを ClickHouse Cloud に保存します。

EMQX Cloud から ClickHouse へのワークフローを示すデータフロー図

MQTT メッセージを EMQX Cloud にパブリッシュする

メッセージのパブリッシュには、任意の MQTT クライアントまたは SDK を使用できます。本チュートリアルでは、EMQ が提供するユーザーフレンドリーな MQTT クライアントアプリケーションである MQTT X を使用します。

クライアントインターフェースを表示している MQTTX の概要

MQTTX で "New Connection" をクリックし、接続フォームに入力します。

  • Name: 接続名。任意の名前を使用できます。
  • Host: MQTT ブローカーの接続アドレス。EMQX Cloud の概要ページから取得できます。
  • Port: MQTT ブローカーの接続ポート。EMQX Cloud の概要ページから取得できます。
  • Username/Password: 先ほど作成した認証情報を使用します。本チュートリアルでは emqxxxxxxx になっているはずです。
接続詳細を設定する MQTTX の新規接続セットアップフォーム

右上の "Connect" ボタンをクリックすると、接続が確立されます。

これで、このツールを使って MQTT ブローカーにメッセージを送信できるようになりました。 入力内容は次のとおりです。

  1. payload format を "JSON" に設定します。
  2. topic を temp_hum/emqx に設定します(先ほどルールで設定したトピック)。
  3. JSON body:
{"temp": 23.1, "hum": 0.68}

右側の送信ボタンをクリックします。temperature の値を変更して、MQTT ブローカーにさらにデータを送信できます。

EMQX Cloud に送信されたデータは、ルールエンジンによって処理され、自動的に ClickHouse Cloud に挿入されます。

MQTTX Publish MQTT Messages インターフェースにおけるメッセージ作成画面

ルール監視を確認する

ルール監視を開き、成功数が 1 件増えていることを確認します。

EMQX Cloud Rule Monitoring ダッシュボードにおけるメッセージ処理メトリクス

永続化されたデータを確認する

ClickHouse Cloud 上のデータを確認します。理想的には、MQTTX を使って送信したデータは EMQX Cloud に送られ、ネイティブなデータ統合機能により ClickHouse Cloud のデータベースに永続化されます。

ClickHouse Cloud のパネルから SQL コンソールに接続するか、任意のクライアントツールを使用して ClickHouse からデータを取得できます。このチュートリアルでは SQL コンソールを使用します。 次の SQL を実行します:

SELECT * FROM emqx.temp_hum;
ClickHouse のクエリ結果で永続化された IoT データを表示している画面

まとめ

コードを一行も書くことなく、MQTT データを EMQX Cloud から ClickHouse Cloud へ送れるようになりました。EMQX Cloud と ClickHouse Cloud を使えば、インフラの運用・管理は不要になり、ClickHouse Cloud に安全に保存されたデータを活用して IoT アプリケーションの開発に専念できます。