メッセージ キューへのアクセス

.NET による分散アプリケーションの構築

Duncan Mackenzie
Microsoft Developer Network

September 2001
日本語版最終更新日 2001 年 11 月 19 日

要約: この記事では、Microsoft .NET Framework の System.Messaging クラスを使用して、MSMQ キューの読み書きを行う方法について説明します。この記事の情報は .NET Framework と Microsoft Visual Studio .NET の Beta 2 に適用されます。

「.NET による分散アプリケーションの構築」の完全なサンプル コードは、MSDN Online Code Center からダウンロードしてください。

目次

はじめに
キューの操作
   プログラムによるキューの作成
   キューの削除
メッセージの送信
   単純な方法
   複雑な方法
メッセージの受信
   タイムアウト値の設定
   特定のフォーマッタの使用
結論

はじめに

タスクを非同期的に実行するとは、その結果を待つことなく実行を続けるということです。これにより、時間のかかるプロセスを開始した後も、そのプロセスが実行を終了するのを待たずに処理を続行することができます。

非同期処理はさまざまな状況に適していますが、特にユーザーの要求の処理に長い時間がかかるが、その応答をただちにユーザーに返したい Web に適しています。ユーザーの要求を非同期的に処理することで、システムはその要求の実行にかかる時間の長短にかかわらず、ただちに応答を返すことができます。

Microsoft® .NET アプリケーションに非同期処理を追加する方法はいくつもあります。たとえば、Microsoft Exchange 2000 や Microsoft BizTalk? Server のワークフロー機能を使用することができます。COM interop を通して COM+ Queued Components を使用することも、Microsoft Message Queing (MSMQ) を直接に扱うこともできます。「アーキテクチャに関するトピック」では、近いうちにこれらの選択肢を詳しく比較する記事を掲載する予定です。

BizTalk と COM+ Queued Components の両方が使用している MSMQ は、Microsoft Windows® NT 4.0 Option Pack と、Microsoft Windows 2000 のサーバー バージョンに付属しています。Windows NT 4.0 Workstation、Windows 2000 Professional、および Windows XP Professional では、この記事で説明するように、MSMQ を使ってローカルなプライベート キューにアクセスすることができます。MSMQ により、アプリケーションはデータをキューに入れることができ、そのデータはアプリケーション (別のアプリケーションか、データを格納したのと同じアプリケーション) が取り出すまでキューの中に保持されます。タスク (処理すべきオーダーなど) を実際に同期的に処理するのではなく、タスクを表すメッセージをキューに入れることで、メイン システムはメッセージをキューに入れるのに要する時間のみで作業を続行することができます。キューには任意の数のシステムがメッセージをポストすることができ、任意の数のシステムがこれらのメッセージを取り出して処理することができるため、アプリケーションのスケーラビリティが向上します。

この記事では、.NET Framework が提供する System.Messaging クラスを使用して MSMQ を扱い、アプリケーションに非同期的なワークフローを追加する方法について説明します。

前に述べたように、MSMQ は Windows NT 4.0、Windows 2000、および Windows XP Professional で利用できますが、この記事の例では MSMQ がインストールされた Windows 2000 サーバーを実行しているものとします。この記事の例では、このサーバーが .NET コードを実行しているのと同じマシンであると仮定しています。しかし、現実のコードでは、別のサーバーに置かれているメッセージ キューに接続することも可能です。

キューの操作

メッセージ キューを使用するためには、まずそれを指定しなくてはなりません。このためには、アプリケーション内でキューを一意に、かつ一貫性のある形で記述するための手段が必要です。.NET は、特定のキューにアクセスするための手段として、次の 3 つを用意しています。

  • パスによってキューを指定する

    最初の方法は、キューのパス (&ltservername;>\private$\&ltqueuename;>, liquidsoap\private$\Orders) を指定するというものです。これは、実際のサーバー名 (ローカル サーバーの場合は ".") と、キューへのフル パスを指定します。

  • フォーマット名によってキューを指定する

    第 2 のオプションはフォーマット名です。これは、接続の詳細情報とキューのパス (DIRECT=OS:liquidsoap\private$\Orders) か、メッセージ キューを一意に識別する特殊なグローバル ユニーク識別子 (GUID) を使ってキューを記述する文字列です。

  • ラベルによってキューを指定する

    キューを指定する第 3 の方法は、キューのラベル ("My Orders") を使用するというものです。これは、コードまたは MSMQ 管理インターフェイスを通して割り当てることができる値です。

    ラベルまたはパスの方法を使用すると、MSMQ はこれらの記述を個々のキューの記述に内部的に使用しているフォーマット名に解決しなくてはならないので、いくぶんオーバーヘッドが生じます。フォーマット名を直接使用すれば、名前の解決が不要になり、より効率的です。また実際には、キューがオフラインになっているときでもクライアント アプリケーションを実行したい場合には、これがキューを参照するための唯一の方法となります。

この記事では、メッセージ キューのオフラインでの使用方法については説明しません。メッセージ キューイングの詳細については、Platform SDK の MSMQ のセクションを参照してください。

キューを名前で指定したら、そのキューを表す System.Messaging.MessageQueue オブジェクトのインスタンスを作成する必要があります。次のコードは、コードの先頭で "Imports System.Messaging" をインクルードしていた場合、キューのパスを識別方法として使用して、MessageQueue オブジェクトの新しいインスタンスを作成します。

                  Private Const QUEUE_NAME As String = ".\private$\Orders"
Dim msgQ As New MessageQueue(QUEUE_NAME)

注   BDAdotNetAsync1.vb サンプル コードの例 1 を参照してください (記事の先頭でダウンロード可能)。

キューのパスでは、実際のサーバー名の代わりに、ローカル マシンを指定する "." を使用しています。マシン上の MSMQ で "Orders" という名前のプライベート キューが定義されていれば、このコードはそのキューへのオブジェクト参照の取得に成功します。より堅牢なアプリケーションにするには、キューが存在するかどうかをチェックし、存在していなければ自動的に作成するようにします。ただし、このタイプのチェックには比較的時間がかかります。

プログラムによるキューの作成

MSMQ インターフェイスを使うとメッセージ キューの作成、削除、および確認を行うことができますが、System.Messaging 名前空間を通してもキューを操作することができます。MessageQueue クラスには、キューが存在するかどうかの確認 (Exists)、キューの作成 (Create)、およびキューの削除 (Delete) などを行うためのいくつかのスタティック メソッドが用意されています (スタティック メソッドは、クラスの実際のインスタンスを作成しなくても呼び出すことができます)。これらのメソッドにより、アプリケーションはキューの存在を確認し、存在していなければ自動的に作成することができます。次に示す関数 GetQ は、まさにこの機能を提供します。

                  Private Function GetQ(ByVal queueName As String) As MessageQueue
    Dim msgQ As MessageQueue
    'キューがまだ存在しなければ作成する
    If Not MessageQueue.Exists(queueName) Then
        Try
            'メッセージ キューと MessageQueue オブジェクトを作成する
            msgQ = MessageQueue.Create(queueName)
        Catch CreateException As Exception
            'コードにキューを作成するための十分な権限がなければ
             'キューの作成時にエラーが発生する可能性がある
            Throw New Exception("Error Creating Queue", CreateException)
        End Try
    Else
        Try
            msgQ = New MessageQueue(queueName)
        Catch GetException As Exception
            Throw New Exception("Error Getting Queue", GetException)
        End Try
    End If
    Return msgQ
End Function

**注   **BDAdotNetAsync1.vb サンプル コードの GetQ を参照してください (記事の先頭でダウンロード可能)。

このタイプのコードは、キューを事前に作成する必要がなくなるため、導入を単純化します。しかし、Exists の呼び出しにはコストがかかるため、キューがインストール時に作成されるのであれば、プログラムではその存在を仮定することで、より高いパフォーマンスを実現できます。また、受信側のコードも目的のキューの存在をチェックしなくてはならないことに注意してください。少なくとも 1 つのメッセージが格納されるまでは、キューが存在していない可能性があるためです。

**注   **キューの作成と削除には特定のセキュリティ権限が必要であり、デフォルトではキューの作成者/所有者か管理者のみが行えます。十分な権限がないと、"Access Denied" メッセージとともに例外が発生します。

キューの削除

残ったもう 1 つの有用なスタティック メソッドが Delete です。このメソッドは Create および Exists と同じようにキューへのパスを取りますが、そのパスを使ってキューを探し、サーバーから削除します。もちろん、そのキューに格納されていたメッセージはすべて破棄されるため、重要なデータが失われる可能性があります。キューが削除されたことを確認できるようにするには、Delete への呼び出しを次のようなエラー処理構造の中に置きます。

                  Try
    ' 指定されたキューを削除する
    MessageQueue.Delete(queuePath)
Catch e As Exception
    ' 呼び出し元に例外を送出する
    Throw New Exception("Deletion Failed", e)
End Try

注   BDAdotNetAsync1.vb サンプル コードの例 2DeleteQ を参照してください (記事の先頭でダウンロード可能)。

メッセージの送信

System.Messaging 名前空間は、メッセージの送信のために単純な方法と複雑な方法の両方を用意しています。この 2 つの方法の間の違いは、メッセージそのものではなく、メッセージのフォーマットと配信をどれほど制御できるかという点にあります。どちらの方法でも、任意のタイプのオブジェクトを送信することができます。この記事の例では、StringDataSet、さらにはサンプル コードの中で宣言したクラスを送信しています。

単純な方法

単純な方法では、数ステップでキューにメッセージを送ることができます。まず最初に、適切なメッセージ キューへの参照を取得し (キューのパス、フォーマット名、またはラベルを使用)、MessageQueue オブジェクトの Send メソッドを使用して、送信したいオブジェクトと (望むならば) メッセージのラベルを指定します。これで作業は終わりです。メッセージの送信方法を制御するすべてのオプションでデフォルト値が使用されるので、プログラマが何もしなくて済みます。次に示すコードは、(この記事で前に示した) GetQ プロシージャを使用し、単純な文字列メッセージをローカルなプライベート キューに送信します。

                  Private Const QUEUE_NAME As String = ".\private$\Orders"
Dim msqQ As MessageQueue
Dim msgText As String
' テキスト メッセージを準備する
msgText = String.Format("Sample Message Sent At {0}", DateTime.Now())
Try
    ' キューの MessageQueue オブジェクトを取得する
    msqQ = GetQ(QUEUE_NAME)
    ' キューにメッセージを送信する
    msqQ.Send(msgText)
Catch e As Exception
    ' エラーを処理する
    ・
End Try

注   BDAdotNetAsync1.vb サンプル コードの例 3 を参照してください (記事の先頭でダウンロード可能)。

メッセージのラベルを指定したいときには、Send メソッドの呼び出しに第 2 のパラメータを含めることができます。

                  msgQ.Send(sMessage, "Sample Message")

Windows NT の MSMQ エクスプローラか、Windows 2000 と XP の Computer Management ツールを使用すると、マシン上に存在するキューを表示し、各キューの中の個々のメッセージを確認することができます。メッセージにラベルを割り当てていた場合には、そのラベルがメッセージ ビューに表示されます。割り当てていなかった場合には、ラベル領域は空白のままになります。

図 1. Windows 2000 Computer Management の MSMQ ウィンドウ

図 1. Windows 2000 Computer Management の MSMQ ウィンドウ

個々のメッセージの内容を表示するには、メッセージを右クリックし、[Properties] をクリックし、[Body] タブをクリックします。単純な送信では、すべてのオプションがデフォルト値に設定されており、送信したオブジェクトは次に示すサンプル (および図 2) のように XML にシリアル化されています。

                  <?xml version="1.0"?>
<string>Sample Message Sent At 5/25/2001 3:51:48 PM</string>

図 2. XML にシリアル化されたメッセージ本体

図 2. XML にシリアル化されたメッセージ本体

**注   **メッセージの送信のデフォルト フォーマットは XML ですが、別のフォーマットを選択することもできます。メッセージのシリアル化に使用されるフォーマッティングの変更方法については、「複雑な方法」のセクションで説明します。

この方法では、オブジェクトを Send メソッドに渡すだけで、任意のオブジェクトを送信することができます。引数として指定されたものが XML にシリアル化され (上の注を参照)、メッセージの本体に格納されます。StringInteger、および Double などの基本データ型に加えて、MSMQ を通してよく送信されるオブジェクト型としては、DataSet と、ユーザーが作成したアプリケーション固有のオブジェクト クラスの 2 つがあります。次に、この両方のオブジェクトを送信する例を示します。System.Messaging クラスを使えば、任意のタイプの情報を簡単に送信できることがわかります。

DataSet を送信するには、DataSet のインスタンスを作成し、これを MessageQueue オブジェクトの Send メソッドに渡します。上の例で送信した String と同様に、DataSet は XML にシリアル化され、メッセージの本体に格納されます。

                  Dim msgQ As MessageQueue
Dim msgText As String
Dim dSet As DataSet
' DataSet オブジェクトをポピュレートする外部のコード
dSet = GetDataSet()
' キューの MessageQueue オブジェクトを取得する
msgQ = GetQ(QUEUE_NAME)
' "Order" のラベルを持つキューに DataSet を含んでいるメッセージを送信する
msgQ.Send(dSet, "Order")
    

注   BDAdotNetAsync1.vb サンプル コードの例 4 を参照してください (記事の先頭でダウンロード可能)。

システムが Order クラスのような独自のクラスを使ってデータを保持している場合でも、上の DataSet を送信するコードと同じように、それらのオブジェクトを MSMQ を通して送信することができます。.NET Messaging クラスは自動的にオブジェクト インスタンスを XML にシリアル化し、メッセージに追加します。次の例では、Order クラスのインスタンスが作成され、そのプロパティがポピュレートされています。その後、MessageQueue オブジェクトの Send メソッドを使って、このオブジェクトをメッセージに格納し、キューに追加します。メッセージの複雑な送信方法について解説した後に、キューからの着信メッセージを処理する際に、これに限らず任意の型のオブジェクトを取得する方法について説明します。

                  Dim msgQ As MessageQueue
Dim myOrder As New Order()
'Order オブジェクトをポピュレートする
myOrder.CustomerID = "ALKI"
myOrder.ID = 34
myOrder.ShipDate = DateTime.Now()
'キューの MessageQueue オブジェクトを取得する
msgQ = GetQ(QUEUE_NAME)
'オーダーを含んでいるメッセージをキューに送信する
msgQ.Send(myOrder, "Order")

注   BDAdotNetAsync1.vb サンプル コードの例 5 を参照してください (記事の先頭でダウンロード可能)。

複雑な方法

Send メソッドはほとんどの用途に適していますが、ときには送信プロセスを細かく制御しなくてはならないことがあります。(この記事のこれまでの例で示してきたように) Send メソッドを使ってオブジェクトを直接送信するのとは別に、System.Messaging.Message クラスのインスタンスを使用する方法があります。1 つの MSMQ メッセージを表す Message オブジェクトを作成することで、フォーマッティング、暗号化、およびタイムアウトなどのオプションを設定し、単純な Send メソッドでは扱えないさまざまなプロパティにアクセスすることができます。

重要なメッセージ オプションの 1 つは、メッセージが送信の際にどのようにシリアル化されるかを制御します。デフォルトでは、オブジェクトを XML 表現にシリアル化する XMLMessageFormatter を使用するように設定されており、これはほとんどの用途に適しています。ただし、これ以外にも BinaryMessageFormatterActiveXMessageFormatter の 2 つのフォーマッタが用意されています。どちらもオブジェクトをバイナリの (XML のように人間が読めるような形でない) フォーマットにシリアル化します。Microsoft ActiveX® フォーマッタは、Microsoft Visual Basic® 6.0 アプリケーションのような非.NET システムにプリミティブや COM オブジェクトを渡すときに使用されるもので、この機能が必要な場合以外には使用するべきではありません。Binary フォーマッタはデフォルトの XML フォーマッタよりも高速なので、MSMQ を使った処理のパフォーマンスが問題になる場合には、このフォーマッタの使用を検討する価値があります。XML の代わりにバイナリ フォーマットを使用するときのトレードオフを考慮に入れることが重要です。パフォーマンスが向上するのと引き換えに、メッセージの内容を直接的に読み、解析することができなくなります。

**注   **メッセージの送信にどのフォーマッタを選択した場合でも、キューからメッセージを受信する際には、それと同じフォーマッタを使用する必要があります。

次のコード例は、メッセージの複雑な送信方法を使用し、フォーマッティングと暗号化を含むいくつかのオプションを設定した後に、事前に構成されている Message オブジェクトを指定して MessageQueue オブジェクトの Send メソッドを使用しています。

                  Dim msgQ As MessageQueue
'送信する新しい Message オブジェクトを作成する
Dim myMsg As New Message()
'このファイルの終わりに定義されている Order クラスの
'新しいインスタンスを作成する
Dim myOrder As New Order()
'Orderr オブジェクトをポピュレートする
myOrder.CustomerID = "ALKI"
myOrder.ID = 34
myOrder.ShipDate = DateTime.Now()

msgQ = GetQ(QUEUE_NAME)

With myMsg
    .Label = "Order"
    .Formatter = New BinaryMessageFormatter()
    .AppSpecific = 34
    .Priority = MessagePriority.VeryHigh
    .Body = myOrder
End With
'Order クラスのインスタンスを含んでいる
'Message オブジェクトを送信する
msgQ.Send(myMsg)

注   BDAdotNetAsync1.vb サンプル コードの 6 を参照してください (記事の先頭でダウンロード可能)。

この例では、デフォルトの XML フォーマッタの代わりに BinaryMessageFormatter が使用されているので、送信されるクラス (Order) は ISerializable インターフェイスをサポートするか、<Serializable()> 属性でマークされている必要があります。完全なサンプル コードでは、属性が使用されています。

メッセージの受信

キューに追加されたメッセージは、追加された順序でキューから取り出すことができます。キューからメッセージを取り出し、そこに含まれているデータを取得するためには、いくつかの重要なステップが必要となります。

  1. まず、メッセージの送信のときと同様に、適切なキューをポイントする MessageQueue オブジェクトを取得します。

    Dim msgQ As New MessageQueue(QUEUE_NAME)
    
  2. 次に、キューの Receive メソッドを使って、キューから最初のメッセージを取得します。

    Dim myMessage As Message
    myMessage = msgQ.Receive()
    
  3. その後、XML フォーマッタを使用している場合は (デフォルト)、Message オブジェクトのフォーマッタに、メッセージに含まれている可能性のあるデータ型のリストをセットアップします。この例では、String データ型のみを想定します。

    Dim targetTypes(0) As Type
    targetTypes(0) = GetType(String)
    myMessage.Formatter = New XmlMessageFormatter(targetTypes)
    
  4. 最後に、Body プロパティを使ってデータを取得し、適切な型にキャストします。

    Dim msgText As String
    msgText = CStr(myMessage.Body)
    

注   BDAdotNetAsync1.vb サンプル コードの例 7 を参照してください (記事の先頭でダウンロード可能)。

タイムアウト値の設定

メッセージを取り出すだけであれば、上に示したステップで十分ですが、このコードはメッセージが取り出し可能になるまで msgQ.Receive の呼び出しでブロック (ハング) します。キューにメッセージが含まれていなければ、プログラムはこの行でいつまでも待つことになります。Receive メソッドでは、パラメータとして System.TimeSpan オブジェクトを与えることでタイムアウト値を指定することができます。数種類のコンストラクタの中の 1 つを使用して、Receive の呼び出しの中で TimeSpan オブジェクトを作成することができます。

                  objMessage = objQ.Receive(New TimeSpan(0, 0, 30))

このコード行は、TimeSpan コンストラクタの 3 パラメータ バージョン (時間、分、秒) を使用して 30 秒のタイムアウト値を指定していますが、任意の時間を指定することができます。この機能を使用し、Receive を呼び出すときにタイムアウト値を指定する場合には、メッセージが受信されないままタイムアウトが起こるとエラーが発生するので、呼び出しをエラー処理ルーチンで囲むようにしてください。

                  Dim myMessage As Message
Try
    myMessage = msgQ.Receive(New TimeSpan(0, 0, 30))
Catch eReceive As MessageQueueException
    Console.WriteLine("{0} ({1})", _
            eReceive.Message, _
            eReceive.MessageQueueErrorCode.ToString)
End Try

注   BDAdotNetAsync1.vb サンプル コードの例 8 を参照してください (記事の先頭でダウンロード可能)。

特定のフォーマッタの使用

XML フォーマッタを使用している限り、どのようなデータ型を送信する場合でも、このプロセスは同じです。BinaryMessageFormatter などの他のフォーマッタを使用する場合には、メッセージ本体を取り出す前に、正しいフォーマッタを使用するようにメッセージを設定する必要があります。次のコードは、バイナリ フォーマッタを使って Order クラスのインスタンスを取り出しています。

                  Dim msgQ As MessageQueue
Dim myOrder As Order
' キューの MessageQueue オブジェクトを取得する
msgQ = GetQ(QUEUE_NAME)
' BinaryMessageFormatter を指定する
msgQ.Formatter = New BinaryMessageFormatter()
' キューからメッセージを読み込み、Order オブジェクトに変換する
myOrder = CType(msgQ.Receive().Body, Order)
' Order オブジェクトに対して操作を行う
Console.WriteLine(myOrder.CustomerID)

注   BDAdotNetAsync1.vb サンプル コードの例 9 を参照してください (記事の先頭でダウンロード可能)。

SendMessage プロシージャは、メッセージ送信のもう 1 つのバリエーションを使用しています。MessageQueue オブジェクトの Formatter プロパティを設定することで、単純な送信に使用されるシリアル化を制御することができます。

この記事は、プロセスが完了するのを待たずに済む非同期処理を扱っていますが、最後の数例で使用した Receive メソッドは完全に同期的です。コードはメッセージ全体がメッセージ キューから取り出されるまで待たなくてはなりません。この記事の例では大した遅れは生じませんが、4 MB もの大きさのメッセージを扱わなくてはならないときには (DataSet を送信するときには、このようなことも起こりえます)、その全体がプログラムのメモリ空間に読み込まれるまで待つことはできないでしょう。 MessageQueue オブジェクトは、大きなメッセージと低速なネットワーク リンクを考慮に入れて、Receive メソッドに加えてメッセージを非同期的に受信する手段を用意しています。メッセージを非同期的に受信するには、上記のように適切なキューへの参照を取得してから、Receive の代わりに BeginReceive を呼び出します (必要ならばタイムアウトを指定します)。これにより受信プロセスが開始され、ただちに次のコード行から実行が続けられます。

                  Private Const QUEUE_NAME As String = "liquidsoap\private$\Orders"
Private WithEvents msgQ As MessageQueue
' キューの MessageQueue オブジェクトを取得する
msgQ = GetQ(QUEUE_NAME)
' メッセージのリッスンを開始する
msgQ.BeginReceive()

メッセージの取り出しに成功すると、MessageQueue オブジェクトの ReceiveCompleted イベントが発火し、イベント ハンドラが呼び出されます。取り出されたメッセージは、パラメータの 1 つとしてイベント ハンドラに送られるので、そのオブジェクトを使って通常どおりに実際のデータを取得することができます。

                  Public Sub msgQ_ReceiveCompleted(ByVal sender As Object, _
        ByVal e As ReceiveCompletedEventArgs) _
        Handles msgQ.ReceiveCompleted
    Dim msgText As String
    Dim eMsg As Message
    'イベント引数 (e) からメッセージを取得する
    eMsg = e.Message
    '期待される型 (String) を指定してフォーマッタをセットアップする
    eMsg.Formatter = _
        New XmlMessageFormatter(New System.Type() {GetType(String)})
    '本体を取り出し、String にキャストする
    msgText = CStr(eMsg.Body)
    Console.WriteLine(msgText)
    'AsyncReceive 内のループを終了させるためのフラグを設定する
    msgReceived = True
End Sub

注   BDAdotNetAsync1.vb サンプル コードの例 10 を参照してください (記事の先頭でダウンロード可能)。

結論

アプリケーション内で非同期処理をインプリメントするための手段の 1 つとして Message Queuing (MSMQ) があります。System.Messaging クラスを使用すると、.NET または COM データ型を含んでいるキュー メッセージの送信と読み込みを行うことができます。非同期処理をインプリメントするためのその他の手段については、今後の「「アーキテクチャに関するトピック」の記事で紹介していく予定です。