EMQXとClickHouseの統合
EMQXへの接続
EMQXは、高性能のリアルタイムメッセージ処理エンジンを持つオープンソースのMQTTブローカーで、IoTデバイスの巨大なスケールでのイベントストリーミングを支えています。最もスケーラブルなMQTTブローカーであるEMQXは、あらゆるデバイスを、あらゆるスケールで接続するのに役立ちます。IoTデータをどこでも移動させ、処理できます。
EMQX Cloudは、EMQがホストするIoT領域向けのMQTTメッセージングミドルウェア製品です。世界初のフルマネージドMQTT 5.0クラウドメッセージングサービスであるEMQX Cloudは、MQTTメッセージングサービスのためのワンストップの運用管理を提供し、独自の隔離された環境を提供します。すべてのもののインターネットの時代において、EMQX CloudはIoT領域の業界アプリケーションを迅速に構築し、IoTデータを容易に収集、送信、計算、および永続化するのに役立ちます。
クラウドプロバイダーが提供するインフラストラクチャを利用して、EMQX Cloudは世界の数十の国や地域にサービスを提供し、5Gおよびすべてのもののインターネットアプリケーションに対して低コストで安全かつ信頼できるクラウドサービスを提供しています。

前提条件
- あなたは、極めて軽量に設計されたパブリッシュ/サブスクライブメッセージングトランスポートプロトコルであるMQTTプロトコルに精通しています。
- あなたはリアルタイムメッセージ処理エンジンとしてEMQXまたはEMQX Cloudを使用しており、IoTデバイスの巨大なスケールでのイベントストリーミングを支えています。
- デバイスデータを永続化するためのClickhouse Cloudインスタンスを準備しました。
- MQTT XをMQTTクライアントテストツールとして使用してEMQX Cloudのデプロイメントに接続し、MQTTデータを公開します。他の方法でMQTTブローカーに接続することも可能です。
ClickHouse Cloud サービスの取得
セットアップ中、私たちはAWSのヴァージニア州N.にClickHouseインスタンスをデプロイしました(us-east -1),同時に同じ地域にEMQX Cloudインスタンスもデプロイしました。

セットアッププロセス中には、接続設定にも注意が必要です。このチュートリアルでは「Anywhere」を選択しますが、特定の場所を申し込む場合は、EMQX Cloudデプロイメントから取得したNATゲートウェイのIPアドレスをホワイトリストに追加する必要があります。

次に、今後の使用のためにユーザー名とパスワードを保存する必要があります。

その後、稼動中のClickhouseインスタンスを取得します。「Connect」をクリックして、Clickhouse Cloudのインスタンス接続アドレスを取得します。

「SQL Consoleに接続」をクリックして、EMQX Cloudとの統合のためにデータベースとテーブルを作成します。

以下のSQL文を参照するか、実際の状況に応じてSQLを修正してください。

EMQX CloudでMQTTサービスを作成
EMQX Cloudで専用のMQTTブローカーを作成するのは、数回のクリックで簡単です。
アカウントを取得
EMQX Cloudは、すべてのアカウントに対して標準デプロイメントとプロフェッショナルデプロイメントの14日間の無料トライアルを提供します。
EMQX Cloudサインアップページにアクセスし、「無料で始める」をクリックして、EMQX Cloudが初めての場合はアカウントを登録してください。

MQTTクラスターの作成
ログイン後、アカウントメニューの「Cloud Console」をクリックすると、新しいデプロイメントを作成するための緑のボタンが表示されます。

このチュートリアルでは、データ統合機能が直接ClickHouseにMQTTデータを送信できるのはProバージョンのみであるため、プロフェッショナルデプロイメントを使用します。
Proバージョンを選択し、N.Virginial
地域を選択し、Create Now
をクリックします。数分で、フルマネージドのMQTTブローカーが手に入ります:

次に、パネルをクリックしてクラスタービューに移動します。このダッシュボードでは、MQTTブローカーの概要が表示されます。

クライアント認証情報の追加
EMQX Cloudでは、デフォルトで匿名接続が許可されていないため、MQTTクライアントツールを使用してこのブローカーにデータを送信するために、クライアント認証情報を追加する必要があります。
左側のメニューで「Authentication & ACL」をクリックし、サブメニューで「Authentication」をクリックします。右側の「Add」ボタンをクリックし、後でMQTT接続用のユーザー名とパスワードを設定します。ここでは、ユーザー名とパスワードにemqx
とxxxxxx
を使用します。

「Confirm」をクリックすると、完全に管理されたMQTTブローカーが準備完了になります。
NATゲートウェイを有効にする
ClickHouseの統合設定を開始できるように、まずNATゲートウェイを有効にする必要があります。デフォルトでは、MQTTブローカーはプライベートVPCにデプロイされており、パブリックネットワーク経由でサードパーティシステムにデータを送信できません。
概要ページに戻り、ページの最下部までスクロールすると、NATゲートウェイウィジェットが表示されます。「Subscribe」ボタンをクリックし、指示に従ってください。NATゲートウェイは付加価値サービスですが、14日間の無料トライアルも提供しています。

作成が完了すると、ウィジェットにパブリックIPアドレスが表示されます。ClickHouse Cloudのセットアップ中に「特定の場所から接続する」を選択した場合は、このIPアドレスをホワイトリストに追加する必要があります。
EMQX CloudとClickHouse Cloudの統合
EMQX Cloudデータ統合は、EMQXメッセージフローとデバイスイベントを処理および応答するためのルールを構成するために使用されます。データ統合は、明確で柔軟な「設定可能」なアーキテクチャソリューションを提供するだけでなく、開発プロセスを簡素化し、ユーザーの使いやすさを向上させ、ビジネスシステムとEMQX Cloudの間の結合度を低下させます。また、EMQX Cloudの独自機能のカスタマイズのための優れたインフラストラクチャを提供します。

EMQX Cloudは、人気のあるデータシステムとの30以上のネイティブ統合を提供しています。ClickHouseもその一つです。

ClickHouseリソースの作成
左側のメニューで「Data Integrations」をクリックし、「View All Resources」をクリックします。データ永続化セクションにClickHouseが表示されるか、ClickHouseを検索できます。
ClickHouseカードをクリックして新しいリソースを作成します。
- 注意: このリソースのためのメモを追加してください。
- サーバーアドレス: これはあなたのClickHouse Cloudサービスのアドレスです。ポートを忘れないでください。
- データベース名: 上記のステップで作成した
emqx
。 - ユーザー: ClickHouse Cloudサービスに接続するためのユーザー名。
- キー: 接続用のパスワード。

新しいルールの作成
リソースを作成中にポップアップが表示され、「New」をクリックするとルール作成ページに移動します。
EMQXは、MQTTメッセージをサードパーティシステムに送信する前に変換および充実させることができる強力なルールエンジンを提供します。
このチュートリアルで使用するルールは以下の通りです:
これにより、temp_hum/emqx
トピックからメッセージを読み取り、クライアントID、トピック、およびタイムスタンプ情報を追加してJSONオブジェクトを強化します。
したがって、トピックに送信される生のJSONは以下のようになります:

SQLテストを使用してテストし、結果を確認できます。

次に「NEXT」ボタンをクリックします。このステップは、EMQX Cloudが洗練されたデータをClickHouseデータベースに挿入する方法を教えます。
応答アクションの追加
リソースが1つだけの場合、「Resource」および「Action Type」を変更する必要はありません。 SQLテンプレートを設定するだけです。以下はこのチュートリアルで使用する例です:

これはClickhouseへのデータ挿入のテンプレートであり、ここで変数が使用されていることがわかります。
ルール詳細の確認
「Confirm」をクリックし、「View Details」をクリックします。すべてが適切に設定されているはずです。ルール詳細ページでデータ統合が機能していることを確認できます。

temp_hum/emqx
トピックに送信されたすべてのMQTTメッセージは、あなたのClickHouse Cloudデータベースに永続化されます。
ClickHouseへのデータの保存
温度と湿度のデータをシミュレートし、これらのデータをMQTT X経由でEMQX Cloudに報告し、その後EMQX Cloudデータ統合を使用してデータをClickHouse Cloudに保存します。

MQTTメッセージをEMQX Cloudに公開
任意のMQTTクライアントやSDKを使用してメッセージを公開できます。このチュートリアルでは、EMQが提供するユーザーフレンドリーなMQTTクライアントアプリケーションであるMQTT Xを使用します。

MQTTXで「New Connection」をクリックし、接続フォームを記入します:
- 名前: 接続名。お好きな名前を使用してください。
- ホスト: MQTTブローカー接続アドレス。EMQX Cloudの概要ページから取得できます。
- ポート: MQTTブローカー接続ポート。EMQX Cloudの概要ページから取得できます。
- ユーザー名/パスワード: 上で作成した認証情報を使用します。このチュートリアルでは
emqx
とxxxxxx
です。

右上の「Connect」ボタンをクリックすると、接続が確立されるはずです。
これで、このツールを使用してMQTTブローカーにメッセージを送信できます。 入力:
- ペイロード形式を「JSON」に設定します。
- トピックを設定します:
temp_hum/emqx
(ルールで設定したトピック) - JSON本文:
右の送信ボタンをクリックします。温度値を変更して、MQTTブローカーに追加のデータを送信できます。
EMQX Cloudに送信されたデータは、ルールエンジンによって処理され、ClickHouse Cloudに自動的に挿入されるはずです。

ルール監視の確認
ルール監視をチェックし、成功の数に1を追加します。

永続化されたデータの確認
さて、ClickHouse Cloudのデータを確認する時間です。理想的には、MQTTXを使用して送信したデータがEMQX Cloudに入り、ネイティブデータ統合の助けを借りてClickHouse Cloudのデータベースに永続化されるでしょう。
ClickHouseのSQLコンソールに接続するか、任意のクライアントツールを使用してClickHouseからデータを取得できます。このチュートリアルではSQLコンソールを使用しました。 SQLを実行することで:

まとめ
あなたは一行のコードも書いていないのに、MQTTデータがEMQX CloudからClickHouse Cloudに移動しました。EMQX CloudとClickHouse Cloudを使用することで、インフラを管理する必要はなく、ClickHouse Cloudに安全に保存されたデータでIoTアプリケーションの作成に集中できます。