EMQX と ClickHouse の統合
EMQX への接続
EMQX は、オープンソースの MQTT ブローカーであり、高性能なリアルタイムメッセージ処理エンジンを備え、大規模に IoT デバイス向けのイベントストリーミングを実現します。最もスケーラブルな MQTT ブローカーとして、EMQX はあらゆるデバイスを、あらゆるスケールで接続するのに役立ちます。IoT データをどこへでも移動させ、処理できます。
EMQX Cloud は、EMQ によってホストされる、IoT 分野向けの MQTT メッセージングミドルウェア製品です。世界初の完全マネージドな MQTT 5.0 クラウドメッセージングサービスとして、EMQX Cloud は運用・保守 (O&M) を一括して提供し、MQTT メッセージングサービス向けに専用の分離環境を提供します。あらゆるものがインターネットにつながる時代において、EMQX Cloud は IoT 分野向けの業界アプリケーションをすばやく構築し、IoT データを容易に収集・伝送・計算・永続化することを可能にします。
クラウドプロバイダーによって提供されるインフラストラクチャを利用することで、EMQX Cloud は世界中の数十の国と地域にサービスを提供し、5G およびあらゆるもののインターネット向けアプリケーションに対して、低コストで安全かつ信頼性の高いクラウドサービスを提供します。

前提条件
- 非常に軽量な publish/subscribe 型のメッセージング・トランスポート・プロトコルとして設計された MQTT プロトコル に精通している。
- 大規模な IoT デバイス向けイベントストリーミングを実現するリアルタイムメッセージ処理エンジンとして、EMQX または EMQX Cloud を利用している。
- デバイスデータを永続化するための ClickHouse Cloud インスタンスを用意している。
- EMQX Cloud のデプロイに接続して MQTT データをパブリッシュするための MQTT クライアントテストツールとして、MQTT X を使用している。または、MQTT ブローカーへの接続が可能な他の方法を使用してもよい。
ClickHouse Cloud サービスを取得する
このセットアップでは、ClickHouse インスタンスを N. Virginia (us-east-1) の AWS 上にデプロイし、同じリージョンに EMQX Cloud インスタンスもデプロイしました。

セットアップの過程では、接続設定にも注意を払う必要があります。本チュートリアルでは「Anywhere」を選択していますが、特定のロケーションを指定する場合は、EMQX Cloud デプロイメントから取得した NAT gateway の IP アドレスを許可リストに追加する必要があります。

続いて、今後使用するためにユーザー名とパスワードを保存しておきます。

その後、稼働中の ClickHouse インスタンスが利用可能になります。「Connect」をクリックして、ClickHouse Cloud のインスタンス接続アドレスを取得します。

「Connect to SQL Console」をクリックして、EMQX Cloud との連携用のデータベースとテーブルを作成します。

以下の SQL 文を参照するか、実際の状況に応じて SQL を調整してください。

EMQX Cloud 上に MQTT サービスを作成する
EMQX Cloud 上に専用の MQTT ブローカーを作成するのは、数回クリックするだけで済みます。
アカウントを作成する
EMQX Cloud は、すべてのアカウントに対して Standard デプロイメントと Professional デプロイメントの両方に 14 日間の無料トライアルを提供しています。
EMQX Cloud sign up ページにアクセスし、EMQX Cloud を初めて使用する場合は「start free」をクリックしてアカウントを登録します。

MQTT クラスターを作成する
ログインしたら、アカウントメニューの "Cloud console" をクリックすると、新しいデプロイメントを作成するための緑色のボタンが表示されます。

このチュートリアルでは Professional デプロイメントを使用します。データ統合機能を提供しているのは Pro バージョンのみであり、これにより 1 行のコードも書かずに MQTT データを直接 ClickHouse に送信できます。
Pro バージョンを選択し、N.Virginial リージョンを選択して Create Now をクリックします。数分で、フルマネージドの MQTT ブローカーが利用できるようになります。

次にパネルをクリックしてクラスタービューに移動します。このダッシュボードでは、MQTT ブローカーの概要を確認できます。

クライアント認証情報を追加する
EMQX Cloud はデフォルトで匿名接続を許可しないため、MQTT クライアントツールを使用してこのブローカーにデータを送信できるように、クライアント認証情報を追加する必要があります。
左側のメニューで「Authentication & ACL」をクリックし、サブメニューで「Authentication」をクリックします。右側の「Add」ボタンをクリックし、後で MQTT 接続に使用するユーザー名とパスワードを設定します。ここではユーザー名とパスワードとして emqx と xxxxxx を使用します。

「Confirm」をクリックすると、フルマネージドの MQTT ブローカーの準備が完了します。
NAT ゲートウェイを有効化する
ClickHouse との連携を設定し始める前に、まず NAT ゲートウェイを有効にする必要があります。デフォルトでは、MQTT ブローカーはプライベート VPC 内にデプロイされており、インターネット経由でサードパーティシステムにデータを送信することはできません。
Overview ページに戻り、ページの一番下までスクロールすると NAT ゲートウェイのウィジェットが表示されます。「Subscribe」ボタンをクリックし、指示に従います。NAT Gateway は付加価値サービスですが、14 日間の無料トライアルも提供されています。

作成が完了すると、ウィジェット内にパブリック IP アドレスが表示されます。ClickHouse Cloud のセットアップ時に「Connect from a specific location」を選択した場合は、この IP アドレスをホワイトリストに追加する必要がある点に注意してください。
EMQX Cloud と ClickHouse Cloud の統合
EMQX Cloud Data Integrations 機能は、EMQX のメッセージフローおよびデバイスイベントを処理し応答するためのルールを構成するために使用されます。Data Integrations は、明確かつ柔軟な「設定可能な」アーキテクチャソリューションを提供するだけでなく、開発プロセスを簡素化し、ユーザビリティを向上させ、業務システムと EMQX Cloud 間の結合度を低減します。また、EMQX Cloud 固有機能のカスタマイズに対して優れた基盤インフラストラクチャも提供します。

EMQX Cloud は、代表的なデータシステム向けに 30 を超えるネイティブ連携機能を提供しています。ClickHouse もその 1 つです。

ClickHouse リソースの作成
左側メニューの「Data Integrations」をクリックし、「View All Resources」をクリックします。Data Persistence セクションで ClickHouse を見つけるか、ClickHouse を検索します。
ClickHouse カードをクリックして新しいリソースを作成します。
- Note: このリソースに関するメモを追加します。
- Server address: これは ClickHouse Cloud サービスのアドレスです。ポートを忘れないようにしてください。
- Database name: 上記の手順で作成した
emqx。 - User: ClickHouse Cloud サービスに接続するためのユーザー名。
- Key: 接続用のパスワード。

新しいルールの作成
リソース作成時にポップアップが表示され、「New」をクリックするとルール作成ページに移動します。
EMQX は強力な rule engine を提供しており、サードパーティシステムに送信する前に、生の MQTT メッセージを変換および拡張できます。
このチュートリアルで使用するルールは次のとおりです。
temp_hum/emqx トピックからメッセージを読み取り、JSON オブジェクトに client_id、topic、timestamp の情報を付与して補完します。
つまり、トピックに送信する生の JSON は次のようになります。

SQL のテスト機能を使ってテストを実行し、結果を確認できます。

次に「NEXT」ボタンをクリックします。このステップでは、EMQX Cloud に対して、整形されたデータを ClickHouse データベースにどのように挿入するかを指定します。
レスポンスアクションを追加する
リソースが 1 つだけの場合は、「Resource」と「Action Type」を変更する必要はありません。 SQL テンプレートを設定するだけで構いません。このチュートリアルで使用している例は次のとおりです。

これは ClickHouse にデータを挿入するためのテンプレートです。ここで変数がどのように利用されているか確認できます。
ルールの詳細を表示
「Confirm」と「View Details」をクリックします。これで、すべての設定が完了しているはずです。ルール詳細ページから、データ統合が正しく動作していることを確認できます。

temp_hum/emqx トピックに送信されたすべての MQTT メッセージは、ClickHouse Cloud データベースに永続化されます。
ClickHouse へのデータ保存
温度と湿度のデータをシミュレートし、そのデータを MQTT X を介して EMQX Cloud に送信します。その後、EMQX Cloud Data Integrations を使用してデータを ClickHouse Cloud に保存します。

MQTT メッセージを EMQX Cloud にパブリッシュする
メッセージのパブリッシュには、任意の MQTT クライアントまたは SDK を使用できます。本チュートリアルでは、EMQ が提供するユーザーフレンドリーな MQTT クライアントアプリケーションである MQTT X を使用します。

MQTTX で "New Connection" をクリックし、接続フォームに入力します。
- Name: 接続名。任意の名前を使用できます。
- Host: MQTT ブローカーの接続アドレス。EMQX Cloud の概要ページから取得できます。
- Port: MQTT ブローカーの接続ポート。EMQX Cloud の概要ページから取得できます。
- Username/Password: 先ほど作成した認証情報を使用します。本チュートリアルでは
emqxとxxxxxxになっているはずです。

右上の "Connect" ボタンをクリックすると、接続が確立されます。
これで、このツールを使って MQTT ブローカーにメッセージを送信できるようになりました。 入力内容は次のとおりです。
- payload format を "JSON" に設定します。
- topic を
temp_hum/emqxに設定します(先ほどルールで設定したトピック)。 - JSON body:
右側の送信ボタンをクリックします。temperature の値を変更して、MQTT ブローカーにさらにデータを送信できます。
EMQX Cloud に送信されたデータは、ルールエンジンによって処理され、自動的に ClickHouse Cloud に挿入されます。

ルール監視を確認する
ルール監視を開き、成功数が 1 件増えていることを確認します。

永続化されたデータを確認する
ClickHouse Cloud 上のデータを確認します。理想的には、MQTTX を使って送信したデータは EMQX Cloud に送られ、ネイティブなデータ統合機能により ClickHouse Cloud のデータベースに永続化されます。
ClickHouse Cloud のパネルから SQL コンソールに接続するか、任意のクライアントツールを使用して ClickHouse からデータを取得できます。このチュートリアルでは SQL コンソールを使用します。 次の SQL を実行します:

まとめ
コードを一行も書くことなく、MQTT データを EMQX Cloud から ClickHouse Cloud へ送れるようになりました。EMQX Cloud と ClickHouse Cloud を使えば、インフラの運用・管理は不要になり、ClickHouse Cloud に安全に保存されたデータを活用して IoT アプリケーションの開発に専念できます。