セールス: 1-800-867-1380

メッセージの非同期送信

更新日: 2014年3月

.NET マネージ仲介型メッセージング API を使用してメッセージング ソリューションを実装する場合に、クライアント側バッチ処理などの Microsoft Azure の Service Bus の拡張処理機能を利用するには、非同期プログラミング モデルの使用を検討する必要があります。非同期メッセージング パターンを使用すると、メッセージの送受信など I/O バウンド操作のオーバーヘッドを一般的に回避可能なソリューションを構築できます。

API メソッドを非同期的に呼び出すと、制御によってただちにコードが返されてアプリケーションが継続して実行されます。このとき、非同期操作の場合は個別に実行されます。アプリケーションは、非同期操作の監視、または操作完了時のコールバックによる通知の受信のいずれかを実行します。このとき、アプリケーションは結果を取得および処理できます。

たとえば、QueueClient クラス (または Service Bus 仲介型メッセージング API が提供するその他の同期メソッド) の Send または Receive メソッドなどの同期操作を呼び出す場合、API コードはそれぞれのメソッドの非同期バージョンを内部的に通過することに注意してください (ブロックする方法でも)。ただし、これらのメソッドの同期バージョンを使用しても、非同期バージョンを直接呼び出す場合に想定されるパフォーマンスに関連するメリットの全範囲が適用されない場合があります。これは、複数のメッセージを送信または受信する際にそれぞれのメッセージング操作を実行しながら他の処理を実行する場合には特に顕著です。

noteメモ
BrokeredMessage オブジェクトはメッセージを表し、ネットワーク上でデータを送信する目的で提供されます。BrokeredMessage オブジェクトはキューまたはトピックに送信されるとすぐに基礎となるメッセージング スタックに読まれるため、それ以降の操作に再利用することはできません。これは、メッセージ本文が読まれるとメッセージ データを配信するストリームを巻き戻せなくなるためです。メッセージング操作が正常に行われたことが確実になるまでは BrokeredMessage インスタンスの作成に使うソース データを保持しておく必要があります。失敗したメッセージング操作の再試行が必要になった場合、そのソース データを使用して新しい BrokeredMessage インスタンスを作成する必要があります。

次のコード スニペットは、送信メッセージの順序を維持しながら複数のメッセージを非同期的に (そして確実に) 送信する方法を示しています。

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

可能な限り、既定のスケジューリングを使用したメッセージング操作の並列処理は避け、タスク並列ライブラリ (TPL) および Parallel LINQ (PLINQ) によるパーティション分割アルゴリズムで作業してください。TPL Framework の基本は、特に計算バウンドの操作の観点からは、並列処理と同時実行処理をアプリケーションに追加する場合に最も適しています。ネットワーキング コールやメッセージング操作などの I/O バウンド コードのパフォーマンスを向上させる TPL の「現状」使用からは、想定したような改善が見られない場合があります。TPL を活用して非同期操作をサポートする最適な方法は、非同期プログラミング モデルに準拠した高度な TPL パターン を使用することです

この情報は役に立ちましたか。
(残り 1500 文字)
フィードバックをいただき、ありがとうございました
表示:
© 2014 Microsoft