EMQX と ClickHouse の統合
EMQX の接続
EMQX は、高性能なリアルタイムメッセージ処理エンジンを持つオープンソースの MQTT ブローカーであり、大規模な IoT デバイス向けのイベントストリーミングを支えています。最もスケーラブルな MQTT ブローカーとして、EMQX はあらゆる規模のデバイスを接続するのに役立ちます。あなたの IoT データをどこでも移動させ、処理します。
EMQX Cloud は、EMQ によってホストされる IoT ドメイン向けの MQTT メッセージングミドルウェア製品です。世界初の完全管理された MQTT 5.0 クラウドメッセージングサービスである EMQX Cloud は、MQTT メッセージングサービスのための一元的な運用・管理およびユニークな隔離環境を提供します。あらゆるモノのインターネットの時代において、EMQX Cloud は IoT ドメイン向けの産業アプリケーションを迅速に構築し、IoT データを容易に収集、送信、計算、保存することができます。
クラウドプロバイダーによって提供されるインフラストラクチャにより、EMQX Cloud は世界中の数十の国と地域にサービスを提供し、5G およびあらゆるモノのインターネットアプリケーション向けに低コスト、高セキュリティ、信頼性のあるクラウドサービスを提供します。

前提条件
- あなたは、軽量な pub/sub メッセージトランスポートプロトコルとして設計された MQTT プロトコル に精通しています。
- あなたは、リアルタイムメッセージ処理エンジンとして EMQX または EMQX Cloud を使用しており、大規模な IoT デバイス向けのイベントストリーミングを支えています。
- あなたは、デバイスデータを永続化するための Clickhouse Cloud インスタンスを用意しています。
- 私たちは、MQTT データをパブリッシュするために EMQX Cloud のデプロイメントに接続するための MQTT クライアントテストツールとして MQTT X を使用しています。他の方法で MQTT ブローカーに接続することも可能です。
ClickHouse Cloud サービスを取得する
この設定中に、私たちは AWS のバージニア州北部 (us-east-1) に ClickHouse インスタンスをデプロイし、同じ地域に EMQX Cloud インスタンスもデプロイしました。

セットアッププロセス中に、接続設定にも注意を払う必要があります。このチュートリアルでは「Anywhere」を選択しますが、特定の場所を要求する場合、EMQX Cloud デプロイメントから取得した NAT ゲートウェイ の IP アドレスをホワイトリストに追加する必要があります。

次に、今後の使用のためにユーザー名とパスワードを保存する必要があります。

その後、稼働中の Clickhouse インスタンスが得られます。「接続」をクリックして Clickhouse Cloud のインスタンス接続アドレスを取得してください。

「SQL コンソールに接続」をクリックして、EMQX Cloud との統合のためのデータベースとテーブルを作成します。

次の SQL 文を参照するか、実際の状況に応じて SQL を修正できます。

EMQX Cloud 上に MQTT サービスを作成する
EMQX Cloud 上に専用の MQTT ブローカーを作成するのは簡単で、数回のクリックで完了します。
アカウントを取得する
EMQX Cloud は、すべてのアカウントに対して標準デプロイメントとプロフェッショナルデプロイメントの両方で 14 日間の無料トライアルを提供しています。
新たに EMQX Cloud を使用する場合は、EMQX Cloud サインアップ ページから「無料で始める」をクリックしてアカウントに登録します。

MQTT クラスターを作成する
ログイン後、アカウントメニューの「Cloud Console」をクリックすると、新しいデプロイメントを作成するための緑のボタンを見ることができます。

このチュートリアルでは、データ統合機能が Pro バージョンのみで提供されるため、プロフェッショナルデプロイメントを使用します。この機能を利用すると、1行のコードも書かずに MQTT データを直接 ClickHouse に送信できます。
Pro バージョンを選択し、N.Virginial
地域を選んで「今すぐ作成」をクリックしてください。数分以内に完全管理された MQTT ブローカーが得られます。

今、パネルをクリックしてクラスターのビューに進みます。このダッシュボードでは、あなたの MQTT ブローカーの概要を見ることができます。

クライアント認証情報を追加する
EMQX Cloud は、デフォルトで匿名接続を許可していないため、MQTT クライアントツールを使用してこのブローカーにデータを送信するために、クライアント認証情報を追加する必要があります。
左メニューの「認証 & ACL」をクリックし、サブメニューの「認証」をクリックします。右の「追加」ボタンをクリックし、後で MQTT 接続に使用するユーザー名とパスワードを設定します。ここでは emqx
と xxxxxx
をユーザー名とパスワードとして使用します。

「確認」をクリックすると、完全管理された MQTT ブローカーが準備完了になります。
NAT ゲートウェイを有効化する
ClickHouse 統合の設定を開始する前に、まず NAT ゲートウェイを有効にする必要があります。デフォルトでは、MQTT ブローカーはプライベート VPC にデプロイされており、公共ネットワークを介してサードパーティのシステムにデータを送信することはできません。
概観ページに戻り、ページの下部までスクロールすると NAT ゲートウェイウィジェットを見ることができます。「購読」ボタンをクリックし、指示に従います。NAT ゲートウェイは付加価値サービスであることに注意してください。ただし、14 日間の無料トライアルも提供しています。

作成が完了すると、ウィジェットにパブリック IP アドレスが表示されます。ClickHouse Cloud セットアップ中に「特定の場所から接続」を選択した場合、この IP アドレスをホワイトリストに追加する必要があることに注意してください。
EMQX Cloud と ClickHouse Cloud の統合
EMQX Cloud Data Integrations は、EMQX メッセージフローおよびデバイスイベントの処理と応答のためのルールを構成するために使用されます。データ統合は、明確で柔軟な「構成可能」なアーキテクチャソリューションを提供するだけでなく、開発プロセスを簡素化し、ユーザーの使いやすさを向上させ、ビジネスシステムと EMQX Cloud との結合度を下げます。また、EMQX Cloud の独自機能のカスタマイズのための優れたインフラストラクチャも提供します。

EMQX Cloud は、人気のあるデータシステムとの 30 以上のネイティブ統合を提供しています。ClickHouse もその一つです。

ClickHouse リソースを作成する
左メニューの「データ統合」をクリックし、「すべてのリソースを見る」をクリックします。データ永続化セクションに ClickHouse が見つかるか、ClickHouse を検索することができます。
ClickHouse カードをクリックして新しいリソースを作成します。
- 注:このリソースにメモを追加します。
- サーバーアドレス:これはあなたの ClickHouse Cloud サービスのアドレスであり、ポートを忘れずに記録してください。
- データベース名:上記のステップで作成した
emqx
。 - ユーザー:あなたの ClickHouse Cloud サービスに接続するためのユーザー名。
- キー:接続用のパスワード。

新しいルールを作成する
リソースの作成中にポップアップが表示され、「新規」をクリックするとルール作成ページに進みます。
EMQX には、サードパーティのシステムに送信する前に生の MQTT メッセージを変換および強化できる強力な ルールエンジン が用意されています。
このチュートリアルで使用するルールは以下の通りです:
これは、temp_hum/emqx
トピックからメッセージを読み取り、client_id、topic、および timestamp 情報を追加して JSON オブジェクトを強化します。
したがって、トピックに送信する生の JSON は以下のようになります:

SQL テストを使用してテストし、結果を確認できます。

今、「NEXT」ボタンをクリックします。このステップでは、EMQX Cloud に対して精練されたデータを ClickHouse データベースに挿入する方法を指示します。
応答アクションを追加する
リソースが 1 つだけの場合、「リソース」と「アクションタイプ」を修正する必要はありません。 SQL テンプレートだけを設定するだけです。ここでは、このチュートリアルで使用する例を示します:

これは Clickhouse にデータを挿入するためのテンプレートで、ここで変数が使用されることがわかります。
ルールの詳細を表示する
「確認」と「詳細を表示」をクリックします。これで、すべてが適切に設定されているはずです。ルールの詳細ページからデータ統合が機能していることを確認できます。

temp_hum/emqx
トピックに送信されたすべての MQTT メッセージは、あなたの ClickHouse Cloud データベースに永続化されます。
ClickHouse へのデータ保存
温度と湿度データをシミュレーションし、これらのデータを MQTT X を通じて EMQX Cloud に報告し、その後 EMQX Cloud データ統合を使用して ClickHouse Cloud に保存します。

EMQX Cloud に MQTT メッセージを公開する
任意の MQTT クライアントまたは SDK を使用してメッセージを公開できます。このチュートリアルでは、EMQ によって提供されるユーザーフレンドリーな MQTT クライアントアプリケーション MQTT X を使用します。

MQTTX で「新しい接続」をクリックし、接続フォームを入力します:
- 名前:接続名。任意の名前を使用できます。
- ホスト:MQTT ブローカー接続アドレス。EMQX Cloud 概要ページから取得できます。
- ポート:MQTT ブローカー接続ポート。EMQX Cloud 概要ページから取得できます。
- ユーザー名/パスワード:上で作成した資格情報を使用します。このチュートリアルでは
emqx
とxxxxxx
です。

右上にある「接続」ボタンをクリックすると接続が確立されます。
これで、このツールを使って MQTT ブローカーにメッセージを送信できます。 入力:
- ペイロード形式を「JSON」に設定します。
- トピックを
temp_hum/emqx
(ルールで設定したトピック)に設定します。 - JSON 本体:
右の送信ボタンをクリックします。温度値を変更し、MQTT ブローカーにより多くのデータを送信できます。
EMQX Cloud に送信されたデータは、ルールエンジンによって処理され、クリックハウスクラウドに自動的に挿入されるはずです。

ルールのモニタリングを表示する
ルールモニタリングをチェックし、成功回数を追加します。

永続化されたデータを確認する
さあ、ClickHouse Cloud のデータを見てみましょう。理想的には、MQTTX を使用して送信したデータが EMQX Cloud に届き、ネイティブなデータ統合の助けを借りて ClickHouse Cloud のデータベースに永続化されるはずです。
ClickHouse の SQL コンソールに接続するか、任意のクライアントツールを使用してデータを取得します。このチュートリアルでは SQL コンソールを使用しました。 SQL を実行することによって:

まとめ
あなたは一行のコードも書かず、EMQX Cloud から ClickHouse Cloud に MQTT データを移動させることができました。EMQX Cloud と ClickHouse Cloud を使用すれば、インフラを管理する必要がなく、データが ClickHouse Cloud に安全に保存される IoT アプリケーションの記述に集中することができます。