エクスポート (0) 印刷
すべて展開

メッセージの非同期受信

更新日: 2014年3月

メッセージを非同期的に送信するのと同様に、また、実用的な面からも、Microsoft Azure の Service Bus からのメッセージを受信するように非同期プログラミング モデルの使用を拡張することもできます。

Service Bus キューまたはサブスクリプションのいずれかで新しいメッセージを待機する間、多くの場合、ソリューションからポーリング要求が発行されています。このような場合でも、Service Bus では、メッセージがキューに到達するか、指定したタイムアウト期間が経過するかのどちらか先に発生する時点まで、サーバーとの接続を維持するロングポーリング受信操作を用意しています。ロングポーリング受信は同期的に実行され、新しいメッセージを待機する間に CLR スレッド プール スレッドをブロックしますが、この方法は最適とは見なされていません。CLR スレッド プールの容量は一般的に限界があるため、長期的に実行される操作に対しては特に、このスレッド プールの使用は回避する必要があります

Service Bus 仲介型メッセージング API を使用する効果的なメッセージング ソリューションを構築するには、常に受信操作を非同期的に実行する必要があります。ソリューションが 1 度に 1 つのメッセージを受信する場合も、複数のメッセージを取得する場合も、指定したタイムアウトが設定された BeginReceive メソッドを使用して受信操作を開始します。現在の API では、受信操作の最大タイムアウト値は 24 日です。ユーザーの代わりに Service Bus メッセージング クライアントが新しいメッセージを待機する間、ユーザーのソリューションはその他の作業を進めることができます。終了するとコールバック メソッドが通知され、受信したメッセージ (ある場合) を処理することができます。

noteメモ
メッセージがキューまたはサブスクリプションから受信されると、本文は 1 度だけ読むことができます。ネットワーク プロトコルの性質上、シーク操作を常にサポートしているわけではないため、メッセージ データ ストリームは常に「巻き戻し可能」ではありません。GetBody() メソッドを呼び出した後にメッセージ データをオブジェクトに配置しデータを保護してから、そのオブジェクトを必要な期間保持します。仲介型メッセージング API では、GetBody() メソッドの呼び出しは 1 回のみサポートされます。

次に、指定した数のメッセージを非同期的に Service Bus キューから受信するプログラミング メソッドのコード サンプルの例を示します。

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

前のセクションの推奨事項と同様に、非同期メッセージ受信操作を並列化するタスク並列ライブラリによる非同期プログラミング モデルの統合を使用することをお勧めします

表示:
© 2014 Microsoft