本文へジャンプ

ソフトウェア Lotus Lotus Developer Domain 製品別技術情報 Lotus Expeditor > 
 

IBM Lotus Expeditor micro broker MQTT クライアントを使用したメッセージのパブリッシュ

 
コンテンツ
トピック
MQTT の接続
パブリッシュ
コールバック
まとめ
リソース
筆者について(原文のまま)
ご意見ご要望をお寄せ下さい
Brian O'Connell (boc@us.ibm.com), Advisory Software Engineer, High Performance On Demand Services, IBM
Andy Stanford-Clark (andysc@uk.ibm.com), Distinguished Engineer, Master Inventor, Manager, Pervasive Messaging Technologies, IBM
Martin Gale (martin_gale@uk.ibm.com), Senior IT Specialist, Master Inventor, Lead Developer of the Lotus Expeditor micro broker, IBM

レベル:中級
原文の掲載:2007年7月24日
原文はこちら(US)

IBM Lotus Expeditor micro broker を使用して、MQ Telemetry Transport (MQTT) パブリッシュ/サブスクライブ・メッセージング・プロトコルをサポートする方法を理解しましょう。サンプル・パブリッシャーの作成、トピックへのパブリッシュ、およびメッセージの受信確認を行います。

メッセージング・ミドルウェアは、ビジネスの統合ソリューションにおいて、柔軟で信頼性のある接続サービスを提供します。IBM の豊富なメッセージング・ミドルウェア技術の 1 つに MQ Telemetry Transport (MQTT) があり、これは Lotus Expeditor micro broker によってサポートされているプロトコルです。MQTT は TCP/IP ベースのパブリッシュ/サブスクライブ・メッセージング・プロトコルで、オーバーヘッドが低いネットワーク上での通信用に設計されています。micro broker は、企業のデータ・センターから離れた位置に置かれることもある小型のアプライアンス・タイプのデバイスにデプロイすることを目的とした小さなメッセージ・ブローカーです (2 MB 未満の Java コード)。この記事では、Lotus Expeditor micro broker に接続できるサンプル・パブリッシャーを作成し、トピックへパブリッシュし、ブローカーによるメッセージの受信を検証します。この記事を読むことにより、業務用の簡単な MQTT パブリッシャーを作成するために必要な知識を得られます。

歴史、設計の主旨、プロトコルの特徴、および MQTT 互換のブローカーへのサブスクライブの方法については、他の記事 「Using MQ Telemetry Transport with WebSphere Business Integration, Part 1: Subscribing(US)」 に記載されています。この記事は前の記事の概念を拡張し、MQTT を使用してメッセージをパブリッシュする方法まで進めます。IBM Websphere Message Broker を使用したサブスクライブの方法は別の記事に記載されているため、この記事では、新しい Lotus Expeditor micro broker を用いたメッセージのパブリケーションについて説明します。WebSphere Message Broker も MQTT プロトコルをサポートするため、この記事で説明する内容は WebSphere Message Broker にも直接あてはまります。

この記事は、MQTT パブリッシャー・アプリケーションを Java で書くときのガイドとなります。この記事で詳述するサンプル・アプリケーションは、空港でのフライトの到着を示す通知メッセージのパブリケーションをシミュレートします。例を簡単にするために、航空会社は 2 社 (Air Freedom および Northern Air)、空港は 2 箇所 (Raleigh-Durham/RDU および London Heathrow/LHR) に限定してあります。この記事の例をコンパイルするには、Lotus Expeditor micro broker で利用できる MQTT ライブラリーが必要です。また、サンプルの MQTT クライアントを使用して接続し、メッセージをパブリッシュするには、Lotus Expeditor micro broker が稼働しているインストール済み環境が必要です。


トピック

パブリッシュ/サブスクライブ・メッセージングでは、メッセージの宛先はトピックと呼ばれます。MQTT プロトコルは階層的なトピック・スペースを持ちます。これは、サブスクライバーとパブリッシャーが異なる精度を使用して宛先トピックを指定するようにトピックが構成される可能性があることを意味します。MQTT にはトピック・スペースに関して強制されるルールはほとんどなく、開発者は、アプリケーションにとって意味のある論理情報スペースを設計する必要があります。

これまでの経験により、説明が少ない簡潔なトピック・スペースよりも、より詳細なトピック・スペースの方が適しています。たとえば、a、b、c などというトピックは実際には推奨できません。短いトピックを使用することで帯域幅を節約できますが、適切に設計され、より説明的で、サブスクライブ側のアプリケーションでワイルドカードを利用できるトピック・スペースを使用する方が、多くの利点を得られます。

トピックはフリー・フォームのストリングで、任意の 1 バイト文字コードで構成できます。ただし、/、+、および # の各文字は、後述するように特殊な意味を持ちます。トピック長は、最大 32,767 文字までの任意の長さです。

フライトの到着および出発メッセージ用に論理的に構成されたトピック・スペースの例を図 1 に示します。この記事のすべてのサンプルで、このトピック・スペースが用いられています。


図 1. トピック・スペース

階層は、「フライト時刻 - 空港 - 航空会社 - Arrivals (到着) または Departures (出発) - フライト番号」という構成になっています。トピック階層をストリングに変換するときには、特殊なスラッシュ文字 (/) を使用してトピック階層を区切り、精度の論理レイヤーを付加する各セクションに分けます。たとえば、RDU 空港に到着する Air Freedom 社のフライト 1326 便に関するメッセージをパブリッシュするには、メッセージ・トピックは「Flight information/RDU/Air Freedom/Arrivals/Flight 1326」となります。論理的に構成されたこのトピック・スペースにより、サブスクライバーのさまざまなグループが、関心のあるメッセージだけを配信するサブスクリプションを作成できます。トピック・スペースの計画が適切でないと、クライアントのサブスクリプションが過多になることがあります。つまり、クライアントが必要よりも広い範囲のトピック・スペースをサブスクライブし、大量のメッセージを受け取ってしまいます。この余分なメッセージにより、関心のあるメッセージを取り出すために、クライアント・アプリケーションで追加のメッセージ・フィルターが必要となります。サブスクリプション過多のために、帯域幅が無駄になるだけでなく、クライアントの効率が低下します。

次のセクションでは、世界のフライトの到着および出発時刻に関心を持つ 3 つのサブスクライバー・グループの例について説明します。

  • 空港で到着便を迎える人
  • フライト情報を表示する空港
  • 定刻どおりのフライトのパーセンテージをトラッキングする航空会社

最初のシナリオでは、MQTT クライアント・アプリケーションを電話にインストールした人が空港で誰かを出迎えます。この場合、この人は特定の 1 つの空港 (LHR) での 1 つのフライト (Air Freedom flight 1024) にのみ関心があります。このフライトの到着通知を受信するには、クライアント・アプリケーションは「Flight information/LHR/Air Freedom/Arrivals/Flight 1024」をサブスクライブします。

2 番目のサブスクリプション・シナリオでは、Heathrow 空港 (LHR) は到着および出発情報を空港の利用客に表示します。このシナリオでは、クライアントは LHR での到着便および出発便だけに関心があります。論理トピック・スペースを使用することで、単一のサブスクリプションにより、LHR を発着する出発便および到着便に関するすべての情報の配信を要求できます。必要なサブスクリプション・トピック・ストリングは「Flight information/LHR/#」です。# 文字は、トピック・スペースでその位置よりも右側にあるすべてのトピックに一致する特殊なワイルドカードであり、トピック・スペースのサブツリーをサブスクライブしているとも考えられます。

最後のシナリオでは、航空会社は定刻運行の統計をトラッキングします。例として用いる航空会社 Northern Air は、世界中での定刻運行のパーセンテージに関心があります。このため、Northern Air には、全世界での到着時刻への単一のサブスクリプションが必要です。この例では、Northern Air が関心を持つのは、出発便ではなく到着便だけです。この場合、Northern Air の関心に一致するトピック・ストリングは、「Flight information/+/Northern Air/Arrivals/#」です。このトピック・ストリングは特殊なワイルドカード + 文字を使用しています。このワイルドカードにより、Northern Air はすべての空港の到着便をサブスクライブできます。# 文字とは異なり、+ 文字はトピック・スペース階層の 1 つのレベルだけに一致し、それ以下のレベルには一致しません (# 文字はそれ以下の階層にも一致します)。

独自の MQTT アプリケーションを作成するときは、トピック・スペースが論理的になるよう慎重に計画し、柔軟なサブスクリプションを可能にする必要があります。このアプローチにより、パブリケーションのほとんどの利用者は、複数のサブスクリプションが不要になり、サブスクリプション過多になることもありません。

上に戻る

MQTT の接続

MQ Telemetry Transport でメッセージをパブリッシュするには、Lotus Expeditor micro broker または MQTT プロトコルをサポートする他のメッセージング・サーバー (たとえば、WebSphere Message Broker など) への接続が必要です。ブローカーへの接続を作成するために、いくつかの手順が必要となります。まず、MQTT プロパティー・オブジェクト(MqttPropertiesオブジェクト)が構築され、クライアント作成ファクトリーに渡されます。このプロパティー・オブジェクトにより、インスタンス生成されたクライアントの構成が提供されます。これらのプロパティーの 1 つに、クライアント・アプリケーションがクリーン・セッション・クライアントかどうかを示すブール値のフラグがあります。これが true の場合は、クライアントが接続するたびに、ブローカーへの前の接続 (前に行われたサブスクリプションや配信待ちのメッセージなど) に関する予備知識なしに接続が行われます。このフラグが false の場合は、ブローカーとの各接続間でクライアントの状態がそのまま維持されます。たとえば、それ以降再接続する際に、クライアント・アプリケーションは毎回再サブスクライブする必要はありません。また、クリーン・セッションを false に設定すると、クライアントとブローカーは、接続が切れたときに中断された処理中のすべてのメッセージ交換を再開します (メッセージに指定されたサービス品質に応じて)。非クリーン・セッション・クライアントを使用するには、MqttPersistence インターフェースのインプリメンテーションを提供する必要があります。このインターフェースのインプリメンテーションを含めることにより、クライアント・アプリケーションが永続的な (信頼できる) メッセージ配信の使用を必要としていることがクライアント作成ファクトリーに示されます。この記事の例では、ネットワークが十分に信頼できるものとして、クリーン・セッション・クライアントを使用しています。プロパティーの構成後、MQTT クライアント・ファクトリーから MQTT クライアント・インスタンスが得られます。MQTT クライアント・インスタンスの作成には、固有のクライアント ID、ブローカーの IP アドレスとポート、前述したオプションの MqttProperties オブジェクトなど、いくつかのパラメーターが必要です。

クライアント ID は、各クライアントの識別をブローカーに示します。これは主に、永続メッセージの転送を可能にし、1 つのクライアントによる接続および切断の繰り返しにわたってサブスクリプションの状態を維持するために使用されます。ブローカーに接続する各クライアントは、異なるクライアント ID を持たなければならない点が重要です。もし、2 つのクライアントが同じクライアント ID を使用してブローカーへの接続を試みると、最後に接続したクライアントが優先され、前の接続は強制的に切断されます。これは、前の接続が完全にクリーンアップされていない状態で、クライアントの再接続を可能にするためにこのように設計されています。クライアント ID の長さは、最大 23 文字です。リスト 1 を参照してください。


リスト 1. 接続


    /**
    * Create a MqttClient object after configuring the MqttProperties object as
    * required.
    */
   private MqttClient createClient() throws MqttException {

       MqttProperties mqttProps = new MqttProperties();
       // Stateless "clean session" client
       mqttProps.setCleanStart(true);

       /**
        * Create the client from the factory. The client ID for this client is
        * "testClient" and the URL in the second parameter describes the
        * location of the broker, in this case, on the local machine.
        */
       MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
               "testClient", "tcp://mybroker:1883", mqttProps);

       return mqttClient;
   }


   /**
    * Connect the MqttClient to a broker.
    * 
    * @throws MqttException
    *             If an error occurs during connection operations.
    */
   private void connect() throws MqttException {

       /**
        * Register this application for callbacks from the client
        */
       client.registerCallback(this);

       /**
        * Connect the client to a broker.
        */
       client.connect();

   } 
          

上に戻る

パブリッシュ

MQTT が正しく接続されると、メッセージをパブリッシュできます。アプリケーションは MQTT クライアント・オブジェクトを通じてパブリッシュします。メッセージのパブリッシュ用のメソッド・シグニチャーは int publish(String, MqttPayload, byte, Boolean) です。この 4 つのパラメーターについて詳しく説明します。

  • String : トピック・パラメーターは文字列型です。この文字列は、パブリケーションをサブスクライバーの関心に一致させるために、ブローカーによって使用されます (サブスクライバーの関心は、前述のサブスクリプション・トピック構文によって指定されます)。
  • MqttPayload : 2 番目のパラメーターは MqttPayload オブジェクトです。MqttPayload オブジェクトには、アプリケーション・データとこのパブリケーション用のすべてのプロトコル・ヘッダーの両方が含まれています。アプリケーションが MqttPayload のどこからデータが始まるのかを決定できるように、オフセットが提供されます。これにより、データがネットワークに書き込まれた後、追加コピーを作成せずに、背後にあるバイト配列にアクセスできます。また、オブジェクトの生成後かつ送信前に、ペイロード内でデータを直接操作するためにアクセスが提供されます。
  • Byte : 3 番目のパラメーター byte は、このパブリケーションのサービス品質 (QoS) です。QoS の有効な値は 0, 1, 2 の 3 つのレベルで、それぞれ次の意味を持ちます。

    • QoS の値が 0 の場合は、パブリッシャーとブローカーは 1 回限りのメッセージの配信を試みますが、メッセージの配信を確実にするために TCP/IP で提供される手順以上のことは実行しません。メッセージは受信確認なしで宛先に送信されるため、このレベルは「投げっ放し」(fire and forget) と呼ばれることがあります。
    • QoS が 1 に設定されていると、メッセージは確実にブローカーまで配信されますが、1 回よりも多く配信される可能性があります。
    • QoS の値が 2 の場合は、MQTT は 1 回だけメッセージを配信します。
    QoS のレベルが上がるほど、プロセッサーおよびネットワークのオーバーヘッドが増加します。QoS の選択は、メッセージング・ソリューション全体のスケーラビリティーに影響することがあり、配信されなかったメッセージをクライアントが保存することの重要性が高まります。このため、パブリッシュされる各メッセージに適切な QoS レベルを選択するよう注意してください。一般に、配信への強い保証が求められない限り、より低い QoS 値を使用するよう試みます。メッセージとともに提供される QoS 値は、クライアントとブローカー間のパブリケーションに関するサービス品質を指定します。また、この値は、ブローカーがそのメッセージをサブスクライバーに配信するときに使用する最大の QoS レベルを指定します。

    サブスクライバーは、メッセージ配信の最大 QoS をトピック単位で指定できるため、QoS 2 でパブリッシュされたメッセージが、そのレベルでサブスクライバーに配信されないことがあります。サブスクライバーは、受信するメッセージにダウングレードした QoS を要求することがあります。パブリッシャーが、そのメッセージのエンドツーエンド QoS を制御できない点が奇妙に思われるかもしれませんが、結果としては、メッセージ・コンシューマーにとって柔軟性が高まります。パブリッシュされたメッセージがサブスクライバーに送信されるとき、ブローカーは、サブスクリプション・プロセスでサブスクライバーによって指定された最大 QoS、またはパブリッシュされたメッセージの QoS (こちらの方が低い場合) でパブリケーションを配信します。たとえば、トピックに QoS 1 を指定したサブスクライバーに、QoS 2 でメッセージをパブリッシュすると、メッセージは QoS 1 で配信されます。そのトピックの同じサブスクライバーに、QoS 0 でメッセージをパブリッシュすると、メッセージは QoS 0 で配信されます。
  • Boolean : 4 番目のパラメーターは、それが保留されたパブリケーションであるかどうかを示すブール値のフラグです。保留されたパブリケーションは、指定されたトピック用に受信した最後のメッセージとしてブローカー内に保持されています。保留されたパブリケーションにより、メッセージがパブリッシュされた後に接続した場合でも、以降のサブスクライバーはサブスクライブすると同時に、トピックに関する最新のメッセージを受信できます。これは、起動直後に表示アプリケーションを作成し、それ以降の情報への変更を用いて表示を更新する場合にたいへん役に立ちます。このフラグが false に設定されている場合は、現在そのトピックをサブスクライブしているサブスクライバーだけがメッセージを受け取ります。リスト 2 に示す例は、保留なしのパブリケーションを使用しています。

publish メソッドは整数のメッセージ ID を返します。登録済みの MqttAdvancedCallback メソッドとともにこの整数を使用して、メッセージがいつブローカーによって受信されたのかを検出できます。

リスト 2 のコードは、Air Freedom の 1024 便は London Heathrow 空港 (LHR) に到着したことを示すメッセージをパブリッシュします。


リスト 2. パブリッシュ


    /**
    * Invoke from the command line with a single parameter, the broker URI,
    * e.g. tcp://mybroker:1883.
    */
   public static void main(String args[]) {

       MqttPublisher publisher = null;

       try {

           publisher = new MqttPublisher(args[0]);

           /**
            * Connect the newly created publisher to the supplied broker.
            */
           publisher.connect();

           /**
            * Publish an "Arrival" message.
            */
                
           publisher.publishMessage(
                   "Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
                   (byte) 2, "Arrived");

           /**
            * Sleep for 1 second waiting to receive notification of
            * publication. Real applications should use appropriate
            * inter-thread signaling mechanisms such as wait/notify, 
                     * cyclic barriers or latches.
            */
           Thread.sleep(1000);

       }
       catch (MqttException exception) {
           System.err.println("Exception occurred during either instantiation, 
           connection, or publication: "
                   + exception.getMessage());
       }
       catch (InterruptedException exception) {
           System.err.println("Interrupted while waiting for publication: "
                   + exception.getMessage());
       }
       finally {
           try {
               /**
                * Close the publisher if instantiated.
                */
               if (publisher != null) {
                   publisher.disconnectClient();
               }
           }
           catch (MqttException exception) {
               System.err.println("Exception occurred closing publisher: "
                       + exception.getMessage());
           }
       }
   }


   /**
    * Construct a new MqttPublisher containing an unconnected MqttClient.
    * 
    * @param brokerURL
    *            Broker URL to (eventually) connect to.
    * @throws MqttException
    *             If an underlying MQTT error occurs instantiating the client
    *             object.
    */
   private MqttPublisher(String brokerURL) throws MqttException {
       this.brokerURL = brokerURL;
       this.client = createClient();
   }
 
 /**
  * Publish a string as a message in byte form with the given quality of
  * service to the given topic.
  */
  public void publishMessage(String topic, byte qos, String message)  
  throws MqttException {
       client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
               false);
   }
      

上に戻る

コールバック

サンプルのクライアントは、ブローカーに接続してメッセージをパブリッシュすることができます。他の記事で説明したサブスクライブ用のクライアントと同様に、コールバックによって、強化された機能をパブリッシャーに与えられます。パブリケーションの確認応答を受け取るには、コールバック・ハンドラーを作成し、MQTT クライアント・オブジェクトに登録する必要があります。コールバック・ハンドラーには、単純なコールバック・ハンドラーと高度なコールバック・ハンドラーの 2 種類があります。前者は MqttCallback インターフェースによって、後者は MqttAdvancedCallback インターフェースによって、それぞれインプリメントされます。MqttAdvancedCallback インターフェースは MqttCallback を拡張するため、高度なコールバック・インターフェースを使用するときは、単純なコールバック・インターフェースで定義されたメソッドもインプリメントしなければなりません。高度なインターフェースを使用するには、単純なコールバック・ハンドラーから継承したメソッドに加え、subscribed(int, byte[])、unsubscribed(int)、および published(int) の 3 つのメソッドをインプリメントする必要があります。

2 つのコールバック・メソッド subscribed(int, byte[]) と unsubscribed(int) は、サブスクリプションの確認応答をモニターするクライアント用のメソッドです。サブスクリプション要求がブローカーによって確認されると、クライアントによって subscribed メソッドが呼び出されます。同様に、トピックからのアンサブスクライブ要求が確認されると、2 番目のメソッド unsubscribed が呼び出されます。この例ではパブリッシュに重点を置いているため、この 2 つのメソッドはサンプル・クライアントによって使用されませんが、コードを正しくコンパイルするために、骨組みのインプリメンテーションが必要です。

パブリッシュ用のクライアントが最も関心を持つメソッドは published(int) メソッドです。このメソッドは、メッセージが正しくブローカーに配信されたことを通知します。このメソッドには、整数のパラメーター messageID が 1 つあります。アプリケーションによっては、この messageID を publish メソッドによって返された messageID に一致させたい場合があります。コールバックは、QoS 値が 1 または 2 でパブリッシュされたメッセージ用にのみ呼び出されます。MqttPersistence インプリメンテーションによって提供されるクライアント・サイドの永続性を使用すると、QoS 1 または 2 の使用と相まって、メッセージ配信を保証するためのコールバック冗長性の使用につながります。MQTT クライアントは、接続障害の発生時にフライト内にあったすべてのパブリケーションをトラッキングし、再接続時にメッセージの配信を完了させるよう試みます。しかし、アプリケーションによっては、メッセージ配信を調整したり、独自の配信保証セマンティクスを与えるこことが役に立つ場合もあるでしょう。

通知を受け取るためには、高度なコールバック・インターフェースのインプリメンテーションを MQTT クライアントに登録する必要があります。registerCallback メソッドは、MQTT クライアントに登録機能を提供します。リスト 2 を拡張したリスト 3(US) のサンプル・コードは、正しく機能する MQTT パブリッシャーです。MqttAdvancedCallback インターフェースはこのクラスによってインプリメントされ、前に作成された MQTT オブジェクトに登録されます。このクラスは、ブローカーの URI (たとえば、tcp://mybroker:1883) を含む単一のパラメーターを使用してコマンド行から開始できます。

上に戻る

まとめ

MQTT は、パブリッシュ/サブスクライブ・メッセージング・パラダイムでの強力なトランスポートです。小規模なクライアントおよび低位のネットワーク・オーバーヘッドが必要とされる状況で、MQTT は他のパブリッシュ/サブスクライブ・プロトコルを上回る強力な有効性を持っています。この記事では、完成された機能を持つ MQTT パブリッシャーの作成方法を説明しました。サンプル・クライアントはブローカーに接続し、メッセージをトピックにパブリッシュします。また、MqttAdvancedCallback インターフェースを使用して、ブローカーへのメッセージの配信を通知する例も示しました。

上に戻る

リソース

学習する

議論する

上に戻る

筆者について(原文のまま)

Brian O'Connell is a software engineer at IBM in Research Triangle Park, NC. He works for the Events Infrastructure team. Brian's expertise includes concurrent programming in Java, scalable I/O designs, and publish/subscribe systems. He is the lead architect and developer of the publish/subscribe system that supports IBM-sponsored sporting event Web sites. Brian received a Bachelor of Science degree in Computer Science from Virginia Polytechnic Institute and State University. You can reach him at boc@us.ibm.com.

Andy Stanford-Clark is an IBM Distinguished Engineer and designated Master Inventor, based at IBM's Hursley Park Development Lab in the UK. He specializes in message broker technology, remote telemetry, Internet technologies, and pervasive computing. He leads an Advanced Technology team, building the technology to integrate data from remote monitoring and control devices into business applications. Andy holds a BSc degree in Computer Science and Mathematics, and a PhD in Parallel Computing, both from the University of East Anglia, Norwich, UK. You can contact Andy at andysc@uk.ibm.com.

Martin Gale is the lead developer of the Lotus Expeditor micro broker and is an appointed IBM Master Inventor. Martin works in the IBM UK Software Development Laboratory in Hursley. Martin's background is in exploitation of emerging technologies for the Web and pervasive channels. Prior to his work in software development, Martin worked in a services role working with IBM's customers to develop first-of-a-kind solutions using new technologies. Martin holds a first class BSc degree in Computer Science from Portsmouth University for which he was sponsored by IBM. You can reach Martin at martin_gale@uk.ibm.com.

上に戻る


上に戻る