|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
IBM Lotus Expeditor micro broker MQTT クライアントを使用したメッセージのパブリッシュ |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Andy Stanford-Clark (andysc@uk.ibm.com), Distinguished Engineer, Master Inventor, Manager, Pervasive Messaging Technologies, IBM Martin Gale (martin_gale@uk.ibm.com), Senior IT Specialist, Master Inventor, Lead Developer of the Lotus Expeditor micro broker, IBM レベル:中級 原文の掲載:2007年7月24日
メッセージング・ミドルウェアは、ビジネスの統合ソリューションにおいて、柔軟で信頼性のある接続サービスを提供します。IBM の豊富なメッセージング・ミドルウェア技術の 1 つに MQ Telemetry Transport (MQTT) があり、これは Lotus Expeditor micro broker によってサポートされているプロトコルです。MQTT は TCP/IP ベースのパブリッシュ/サブスクライブ・メッセージング・プロトコルで、オーバーヘッドが低いネットワーク上での通信用に設計されています。micro broker は、企業のデータ・センターから離れた位置に置かれることもある小型のアプライアンス・タイプのデバイスにデプロイすることを目的とした小さなメッセージ・ブローカーです (2 MB 未満の Java コード)。この記事では、Lotus Expeditor micro broker に接続できるサンプル・パブリッシャーを作成し、トピックへパブリッシュし、ブローカーによるメッセージの受信を検証します。この記事を読むことにより、業務用の簡単な MQTT パブリッシャーを作成するために必要な知識を得られます。 歴史、設計の主旨、プロトコルの特徴、および MQTT 互換のブローカーへのサブスクライブの方法については、他の記事 「Using MQ Telemetry Transport with WebSphere Business Integration, Part 1: Subscribing(US)」 に記載されています。この記事は前の記事の概念を拡張し、MQTT を使用してメッセージをパブリッシュする方法まで進めます。IBM Websphere Message Broker を使用したサブスクライブの方法は別の記事に記載されているため、この記事では、新しい Lotus Expeditor micro broker を用いたメッセージのパブリケーションについて説明します。WebSphere Message Broker も MQTT プロトコルをサポートするため、この記事で説明する内容は WebSphere Message Broker にも直接あてはまります。 この記事は、MQTT パブリッシャー・アプリケーションを Java で書くときのガイドとなります。この記事で詳述するサンプル・アプリケーションは、空港でのフライトの到着を示す通知メッセージのパブリケーションをシミュレートします。例を簡単にするために、航空会社は 2 社 (Air Freedom および Northern Air)、空港は 2 箇所 (Raleigh-Durham/RDU および London Heathrow/LHR) に限定してあります。この記事の例をコンパイルするには、Lotus Expeditor micro broker で利用できる MQTT ライブラリーが必要です。また、サンプルの MQTT クライアントを使用して接続し、メッセージをパブリッシュするには、Lotus Expeditor micro broker が稼働しているインストール済み環境が必要です。 トピックパブリッシュ/サブスクライブ・メッセージングでは、メッセージの宛先はトピックと呼ばれます。MQTT プロトコルは階層的なトピック・スペースを持ちます。これは、サブスクライバーとパブリッシャーが異なる精度を使用して宛先トピックを指定するようにトピックが構成される可能性があることを意味します。MQTT にはトピック・スペースに関して強制されるルールはほとんどなく、開発者は、アプリケーションにとって意味のある論理情報スペースを設計する必要があります。 これまでの経験により、説明が少ない簡潔なトピック・スペースよりも、より詳細なトピック・スペースの方が適しています。たとえば、a、b、c などというトピックは実際には推奨できません。短いトピックを使用することで帯域幅を節約できますが、適切に設計され、より説明的で、サブスクライブ側のアプリケーションでワイルドカードを利用できるトピック・スペースを使用する方が、多くの利点を得られます。 トピックはフリー・フォームのストリングで、任意の 1 バイト文字コードで構成できます。ただし、/、+、および # の各文字は、後述するように特殊な意味を持ちます。トピック長は、最大 32,767 文字までの任意の長さです。 フライトの到着および出発メッセージ用に論理的に構成されたトピック・スペースの例を図 1 に示します。この記事のすべてのサンプルで、このトピック・スペースが用いられています。 図 1. トピック・スペース ![]() 階層は、「フライト時刻 - 空港 - 航空会社 - Arrivals (到着) または Departures (出発) - フライト番号」という構成になっています。トピック階層をストリングに変換するときには、特殊なスラッシュ文字 (/) を使用してトピック階層を区切り、精度の論理レイヤーを付加する各セクションに分けます。たとえば、RDU 空港に到着する Air Freedom 社のフライト 1326 便に関するメッセージをパブリッシュするには、メッセージ・トピックは「Flight information/RDU/Air Freedom/Arrivals/Flight 1326」となります。論理的に構成されたこのトピック・スペースにより、サブスクライバーのさまざまなグループが、関心のあるメッセージだけを配信するサブスクリプションを作成できます。トピック・スペースの計画が適切でないと、クライアントのサブスクリプションが過多になることがあります。つまり、クライアントが必要よりも広い範囲のトピック・スペースをサブスクライブし、大量のメッセージを受け取ってしまいます。この余分なメッセージにより、関心のあるメッセージを取り出すために、クライアント・アプリケーションで追加のメッセージ・フィルターが必要となります。サブスクリプション過多のために、帯域幅が無駄になるだけでなく、クライアントの効率が低下します。 次のセクションでは、世界のフライトの到着および出発時刻に関心を持つ 3 つのサブスクライバー・グループの例について説明します。
最初のシナリオでは、MQTT クライアント・アプリケーションを電話にインストールした人が空港で誰かを出迎えます。この場合、この人は特定の 1 つの空港 (LHR) での 1 つのフライト (Air Freedom flight 1024) にのみ関心があります。このフライトの到着通知を受信するには、クライアント・アプリケーションは「Flight information/LHR/Air Freedom/Arrivals/Flight 1024」をサブスクライブします。 2 番目のサブスクリプション・シナリオでは、Heathrow 空港 (LHR) は到着および出発情報を空港の利用客に表示します。このシナリオでは、クライアントは LHR での到着便および出発便だけに関心があります。論理トピック・スペースを使用することで、単一のサブスクリプションにより、LHR を発着する出発便および到着便に関するすべての情報の配信を要求できます。必要なサブスクリプション・トピック・ストリングは「Flight information/LHR/#」です。# 文字は、トピック・スペースでその位置よりも右側にあるすべてのトピックに一致する特殊なワイルドカードであり、トピック・スペースのサブツリーをサブスクライブしているとも考えられます。 最後のシナリオでは、航空会社は定刻運行の統計をトラッキングします。例として用いる航空会社 Northern Air は、世界中での定刻運行のパーセンテージに関心があります。このため、Northern Air には、全世界での到着時刻への単一のサブスクリプションが必要です。この例では、Northern Air が関心を持つのは、出発便ではなく到着便だけです。この場合、Northern Air の関心に一致するトピック・ストリングは、「Flight information/+/Northern Air/Arrivals/#」です。このトピック・ストリングは特殊なワイルドカード + 文字を使用しています。このワイルドカードにより、Northern Air はすべての空港の到着便をサブスクライブできます。# 文字とは異なり、+ 文字はトピック・スペース階層の 1 つのレベルだけに一致し、それ以下のレベルには一致しません (# 文字はそれ以下の階層にも一致します)。 独自の MQTT アプリケーションを作成するときは、トピック・スペースが論理的になるよう慎重に計画し、柔軟なサブスクリプションを可能にする必要があります。このアプローチにより、パブリケーションのほとんどの利用者は、複数のサブスクリプションが不要になり、サブスクリプション過多になることもありません。
MQTT の接続MQ Telemetry Transport でメッセージをパブリッシュするには、Lotus Expeditor micro broker または MQTT プロトコルをサポートする他のメッセージング・サーバー (たとえば、WebSphere Message Broker など) への接続が必要です。ブローカーへの接続を作成するために、いくつかの手順が必要となります。まず、MQTT プロパティー・オブジェクト(MqttPropertiesオブジェクト)が構築され、クライアント作成ファクトリーに渡されます。このプロパティー・オブジェクトにより、インスタンス生成されたクライアントの構成が提供されます。これらのプロパティーの 1 つに、クライアント・アプリケーションがクリーン・セッション・クライアントかどうかを示すブール値のフラグがあります。これが true の場合は、クライアントが接続するたびに、ブローカーへの前の接続 (前に行われたサブスクリプションや配信待ちのメッセージなど) に関する予備知識なしに接続が行われます。このフラグが false の場合は、ブローカーとの各接続間でクライアントの状態がそのまま維持されます。たとえば、それ以降再接続する際に、クライアント・アプリケーションは毎回再サブスクライブする必要はありません。また、クリーン・セッションを false に設定すると、クライアントとブローカーは、接続が切れたときに中断された処理中のすべてのメッセージ交換を再開します (メッセージに指定されたサービス品質に応じて)。非クリーン・セッション・クライアントを使用するには、MqttPersistence インターフェースのインプリメンテーションを提供する必要があります。このインターフェースのインプリメンテーションを含めることにより、クライアント・アプリケーションが永続的な (信頼できる) メッセージ配信の使用を必要としていることがクライアント作成ファクトリーに示されます。この記事の例では、ネットワークが十分に信頼できるものとして、クリーン・セッション・クライアントを使用しています。プロパティーの構成後、MQTT クライアント・ファクトリーから MQTT クライアント・インスタンスが得られます。MQTT クライアント・インスタンスの作成には、固有のクライアント ID、ブローカーの IP アドレスとポート、前述したオプションの MqttProperties オブジェクトなど、いくつかのパラメーターが必要です。 クライアント ID は、各クライアントの識別をブローカーに示します。これは主に、永続メッセージの転送を可能にし、1 つのクライアントによる接続および切断の繰り返しにわたってサブスクリプションの状態を維持するために使用されます。ブローカーに接続する各クライアントは、異なるクライアント ID を持たなければならない点が重要です。もし、2 つのクライアントが同じクライアント ID を使用してブローカーへの接続を試みると、最後に接続したクライアントが優先され、前の接続は強制的に切断されます。これは、前の接続が完全にクリーンアップされていない状態で、クライアントの再接続を可能にするためにこのように設計されています。クライアント ID の長さは、最大 23 文字です。リスト 1 を参照してください。 リスト 1. 接続
パブリッシュMQTT が正しく接続されると、メッセージをパブリッシュできます。アプリケーションは MQTT クライアント・オブジェクトを通じてパブリッシュします。メッセージのパブリッシュ用のメソッド・シグニチャーは int publish(String, MqttPayload, byte, Boolean) です。この 4 つのパラメーターについて詳しく説明します。
publish メソッドは整数のメッセージ ID を返します。登録済みの MqttAdvancedCallback メソッドとともにこの整数を使用して、メッセージがいつブローカーによって受信されたのかを検出できます。 リスト 2 のコードは、Air Freedom の 1024 便は London Heathrow 空港 (LHR) に到着したことを示すメッセージをパブリッシュします。 リスト 2. パブリッシュ
コールバックサンプルのクライアントは、ブローカーに接続してメッセージをパブリッシュすることができます。他の記事で説明したサブスクライブ用のクライアントと同様に、コールバックによって、強化された機能をパブリッシャーに与えられます。パブリケーションの確認応答を受け取るには、コールバック・ハンドラーを作成し、MQTT クライアント・オブジェクトに登録する必要があります。コールバック・ハンドラーには、単純なコールバック・ハンドラーと高度なコールバック・ハンドラーの 2 種類があります。前者は MqttCallback インターフェースによって、後者は MqttAdvancedCallback インターフェースによって、それぞれインプリメントされます。MqttAdvancedCallback インターフェースは MqttCallback を拡張するため、高度なコールバック・インターフェースを使用するときは、単純なコールバック・インターフェースで定義されたメソッドもインプリメントしなければなりません。高度なインターフェースを使用するには、単純なコールバック・ハンドラーから継承したメソッドに加え、subscribed(int, byte[])、unsubscribed(int)、および published(int) の 3 つのメソッドをインプリメントする必要があります。 2 つのコールバック・メソッド subscribed(int, byte[]) と unsubscribed(int) は、サブスクリプションの確認応答をモニターするクライアント用のメソッドです。サブスクリプション要求がブローカーによって確認されると、クライアントによって subscribed メソッドが呼び出されます。同様に、トピックからのアンサブスクライブ要求が確認されると、2 番目のメソッド unsubscribed が呼び出されます。この例ではパブリッシュに重点を置いているため、この 2 つのメソッドはサンプル・クライアントによって使用されませんが、コードを正しくコンパイルするために、骨組みのインプリメンテーションが必要です。 パブリッシュ用のクライアントが最も関心を持つメソッドは published(int) メソッドです。このメソッドは、メッセージが正しくブローカーに配信されたことを通知します。このメソッドには、整数のパラメーター messageID が 1 つあります。アプリケーションによっては、この messageID を publish メソッドによって返された messageID に一致させたい場合があります。コールバックは、QoS 値が 1 または 2 でパブリッシュされたメッセージ用にのみ呼び出されます。MqttPersistence インプリメンテーションによって提供されるクライアント・サイドの永続性を使用すると、QoS 1 または 2 の使用と相まって、メッセージ配信を保証するためのコールバック冗長性の使用につながります。MQTT クライアントは、接続障害の発生時にフライト内にあったすべてのパブリケーションをトラッキングし、再接続時にメッセージの配信を完了させるよう試みます。しかし、アプリケーションによっては、メッセージ配信を調整したり、独自の配信保証セマンティクスを与えるこことが役に立つ場合もあるでしょう。 通知を受け取るためには、高度なコールバック・インターフェースのインプリメンテーションを MQTT クライアントに登録する必要があります。registerCallback メソッドは、MQTT クライアントに登録機能を提供します。リスト 2 を拡張したリスト 3(US) のサンプル・コードは、正しく機能する MQTT パブリッシャーです。MqttAdvancedCallback インターフェースはこのクラスによってインプリメントされ、前に作成された MQTT オブジェクトに登録されます。このクラスは、ブローカーの URI (たとえば、tcp://mybroker:1883) を含む単一のパラメーターを使用してコマンド行から開始できます。
まとめMQTT は、パブリッシュ/サブスクライブ・メッセージング・パラダイムでの強力なトランスポートです。小規模なクライアントおよび低位のネットワーク・オーバーヘッドが必要とされる状況で、MQTT は他のパブリッシュ/サブスクライブ・プロトコルを上回る強力な有効性を持っています。この記事では、完成された機能を持つ MQTT パブリッシャーの作成方法を説明しました。サンプル・クライアントはブローカーに接続し、メッセージをトピックにパブリッシュします。また、MqttAdvancedCallback インターフェースを使用して、ブローカーへのメッセージの配信を通知する例も示しました。
リソース学習する
議論する
筆者について(原文のまま)
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||