EMQXとClickHouseの統合
EMQXへの接続
EMQX は、高性能なリアルタイムメッセージ処理エンジンを備えたオープンソースのMQTTブローカーで、IoTデバイス向けに大規模なイベントストリーミングを提供します。最もスケーラブルなMQTTブローカーとして、EMQXはあらゆる規模で任意のデバイスを接続するのを助けます。IoTデータをどこにでも移動し、処理します。
EMQX Cloud は、 EMQ がホストするIoTドメイン向けのMQTTメッセージングミドルウェア製品です。EMQX Cloudは、世界初の完全に管理されたMQTT 5.0クラウドメッセージングサービスであり、MQTTメッセージングサービス用のワンストップO&Mコロケーションと独自の隔離された環境を提供します。あらゆるモノがインターネットで接続される時代に、EMQX CloudはIoTドメインの業界アプリケーションを迅速に構築し、IoTデータの収集、伝送、計算、および永続化を容易にします。
クラウドプロバイダーが提供するインフラストラクチャを使用することで、EMQX Cloudは世界中の数十の国と地域にサービスを提供し、5Gおよびあらゆるモノがインターネットで接続されるアプリケーション向けに、低コストで安全かつ信頼性のあるクラウドサービスを提供します。

前提条件
- あなたは、非常に軽量なパブリッシュ/サブスクライブメッセージ輸送プロトコルとして設計されたMQTTプロトコルに精通しています。
- あなたは、リアルタイムメッセージ処理エンジンとしてEMQXまたはEMQX Cloudを使用し、IoTデバイス向けに大規模なイベントストリーミングを実現しています。
- デバイスデータを永続化するためのClickhouse Cloudインスタンスを準備済みです。
- 私たちは、EMQX CloudにMQTTデータを公開するために、MQTTクライアントテストツールとしてMQTT Xを使用します。他の方法でMQTTブローカーに接続することも可能です。
ClickHouse Cloudサービスの取得
この設定中に、AWSのN.バージニア(us-east-1)にClickHouseインスタンスを展開し、同じ地域にEMQX Cloudインスタンスも展開しました。

設定プロセス中に接続設定にも注意を払う必要があります。このチュートリアルでは「Anywhere」を選択しましたが、特定の場所を申し込む場合は、EMQX Cloudデプロイメントから取得したNATゲートウェイのIPアドレスをホワイトリストに追加する必要があります。

次に、将来使用するためにユーザー名とパスワードを保存する必要があります。

その後、稼働中のClickhouseインスタンスが取得できます。「接続」をクリックしてClickhouse Cloudのインスタンス接続アドレスを取得します。

「SQLコンソールに接続」をクリックして、EMQX Cloudとの統合のためにデータベースとテーブルを作成します。

以下のSQLステートメントを参照するか、実際の状況に応じてSQLを修正できます。

EMQX Cloud上にMQTTサービスを作成
EMQX Cloud上に専用のMQTTブローカーを作成するのは、数クリックで簡単です。
アカウントを取得
EMQX Cloudは、標準デプロイメントとプロフェッショナルデプロイメントの両方に対して、すべてのアカウントに14日間の無料トライアルを提供しています。
EMQX Cloud signup ページから始めて、新しいユーザーの場合は無料登録をクリックしてアカウントを登録します。

MQTTクラスターの作成
ログインしたら、アカウントメニューの「Cloud console」をクリックし、新しいデプロイメントを作成するための緑色のボタンを見ることができます。

このチュートリアルでは、Professionalデプロイメントを使用します。プロ版のみがデータ統合機能を提供し、単一行のコードを書くことなくMQTTデータをClickHouseに直接送ることができます。
プロ版を選択し、N.Virginial
地域を選択して「今すぐ作成」をクリックします。数分で完全に管理されたMQTTブローカーを取得できます:

今、パネルをクリックしてクラスターのビューに移動します。このダッシュボードでは、MQTTブローカーの概要を見ることができます。

クライアント認証情報の追加
EMQX Cloudはデフォルトで匿名接続を許可していないため、MQTTクライアントツールでこのブローカーにデータを送信できるように、クライアント認証情報を追加する必要があります。
左メニューの「Authentication & ACL」をクリックし、サブメニューで「Authentication」をクリックします。右側の「追加」ボタンをクリックし、後でMQTT接続用のユーザー名とパスワードを入力します。ここでは、ユーザー名としてemqx
、パスワードとしてxxxxxx
を使用します。

「確認」をクリックすると、完全に管理されたMQTTブローカーが準備されます。
NATゲートウェイの有効化
ClickHouse統合の設定を開始する前に、最初にNATゲートウェイを有効にする必要があります。デフォルトではMQTTブローカーはプライベートVPC内に展開されており、公開ネットワーク経由でサードパーティシステムにデータを送信することができません。
概要ページに戻り、ページの下部にスクロールするとNATゲートウェイウィジェットを見ることができます。購読ボタンをクリックし、指示に従ってください。NATゲートウェイは付加価値サービスですが、14日間の無料トライアルも提供しています。

作成が完了すると、ウィジェットに公開IPアドレスが表示されます。ClickHouse Cloudの設定中に「特定の場所から接続する」を選択した場合は、このIPアドレスをホワイトリストに追加する必要があることに注意してください。
EMQX CloudとClickHouse Cloudの統合
EMQX Cloudデータ統合は、EMQXメッセージフローおよびデバイスイベントの処理と応答のためのルールを構成するために使用されます。データ統合は、明確で柔軟な「構成可能」アーキテクチャソリューションを提供するだけでなく、開発プロセスを簡素化し、ユーザーの使いやすさを向上させ、ビジネスシステムとEMQX Cloud間の結合度を低下させます。また、EMQX Cloudの独自機能のカスタマイズに優れたインフラストラクチャを提供します。

EMQX Cloudは、人気のデータシステムと30以上のネイティブ統合を提供しています。ClickHouseもその一つです。

ClickHouseリソースの作成
左メニューの「データ統合」をクリックし、「すべてのリソースを表示」をクリックします。データ永続性セクションでClickHouseを見つけるか、ClickHouseを検索します。
ClickHouseカードをクリックして、新しいリソースを作成します。
- 注:このリソース用のメモを追加します。
- サーバーアドレス:これはClickHouse Cloudサービスのアドレスです。ポートを忘れないようにしてください。
- データベース名:上記の手順で作成した
emqx
。 - ユーザー:ClickHouse Cloudサービスへの接続用のユーザー名。
- キー:接続用のパスワード。

新しいルールの作成
リソースの作成中にポップアップが表示され、「新規」をクリックするとルール作成ページに移動します。
EMQXは、サードパーティシステムに送信する前に、生のMQTTメッセージを変換して強化する強力なルールエンジンを提供します。
このチュートリアルで使用されるルールは以下の通りです:
これは、temp_hum/emqx
トピックからメッセージを読み取り、クライアントID、トピック、タイムスタンプ情報を追加してJSONオブジェクトを強化します。
したがって、トピックに送信する生のJSONは:

SQLテストを使用して結果をテストすることができます。

「次へ」ボタンをクリックします。このステップでは、EMQX Cloudに対して、どのように洗練されたデータをClickHouseデータベースに挿入するかを指示します。
応答アクションの追加
リソースが1つだけの場合、'Resource' と 'Action Type' を変更する必要はありません。 SQLテンプレートを設定するだけです。ここで使用する例は次の通りです:

これは、Clickhouseにデータを挿入するためのテンプレートであり、ここで変数が使用されていることがわかります。
ルールの詳細を表示
「確認」をクリックし、「詳細を表示」をクリックします。今、すべては正しく設定されているはずです。ルールの詳細ページからデータ統合が機能しているのを確認できます。

temp_hum/emqx
トピックに送信されたすべてのMQTTメッセージは、ClickHouse Cloudデータベースに永続化されます。
ClickHouseへのデータの保存
私たちは温度と湿度のデータをシミュレーションし、MQTT Xを介してこれらのデータをEMQX Cloudに報告し、その後、EMQX Cloudデータ統合を使用してClickHouse Cloudにデータを保存します。

EMQX CloudにMQTTメッセージを公開
任意のMQTTクライアントまたはSDKを使用してメッセージを公開できます。このチュートリアルでは、MQTT X を使用し、EMQが提供するユーザーフレンドリーなMQTTクライアントアプリケーションです。

MQTTXで「新規接続」をクリックし、接続フォームを埋めます:
- 名前:接続名。任意の名前を使用してください。
- ホスト:MQTTブローカー接続アドレス。EMQX Cloudの概要ページから取得できます。
- ポート:MQTTブローカー接続ポート。EMQX Cloudの概要ページから取得できます。
- ユーザー名/パスワード:上で作成した認証情報を使用します。このチュートリアルでは
emqx
とxxxxxx
です。

右上の「接続」ボタンをクリックすると、接続が確立されるはずです。
これで、このツールを使用してMQTTブローカーにメッセージを送信できます。 入力:
- ペイロード形式を「JSON」に設定します。
- トピックを設定:
temp_hum/emqx
(ルールで設定したトピック) - JSONボディ:
右の送信ボタンをクリックします。温度値を変更してMQTTブローカーにさらに多くのデータを送信することができます。
EMQX Cloudに送信されたデータは、ルールエンジンによって処理され、ClickHouse Cloudに自動的に挿入されるはずです。

ルールモニタリングの表示
ルールモニタリングを確認し、成功数を1つ追加します。

永続化されたデータの確認
今、ClickHouse Cloudでのデータを確認する時です。理想的には、MQTTXを使用して送信されたデータはEMQX Cloudに渡り、ネイティブデータ統合の助けを借りてClickHouse Cloudのデータベースに永続化されます。
ClickHouse CloudパネルのSQLコンソールに接続するか、任意のクライアントツールを使用してClickHouseからデータを取得します。このチュートリアルではSQLコンソールを使用しました。 次のSQLを実行します:

まとめ
コードを書かずに、EMQX CloudからClickHouse CloudにMQTTデータを移動しました。EMQX CloudとClickHouse Cloudを使用すると、インフラ管理は必要なく、データがClickHouse Cloudに安全に保存されたIoTアプリケーションの作成に集中できます。