セールス: 1-800-867-1380

信頼できるメッセージ受信ループの実装

更新日: 2014年3月

仲介型メッセージング API を利用した複数の顧客プロジェクトの結果から、メッセージの受信は、潜在異常を処理する健全なアプローチを持たない、受信ロジックの反復実装に依存しやすいことが判明しました。一般的にそのようなロジックは、期限切れのメッセージ ロックなどのエッジ ケースは許可しません。この種類のアプローチは堅牢な方法で実装しない限りエラーが発生しやすくなります。このセクションでは、信頼性の高いメッセージ受信ロジックの実装に関する具体的な推奨事項について説明します。

まず、Service Bus からメッセージを受信できるモードが 2 種類あることに注意してください。これらのモードは仲介型メッセージング API によって実現され、「At-Most-Once」(ReceiveAndDelete を使用) または「At Least Once」(PeekLock を使用) のいずれかのセマンティクスを使用してメッセージ配信をサポートします。

最初のモード ReceiveAndDelete は最も簡単なモデルで、メッセージング処理でのエラーをアプリケーションが許容できるシナリオに適しています。ReceiveAndDelete モードを使用する場合、受信操作は single-hop 操作となり、その間にクライアントに配信されるメッセージは読まれたとしてマークされ、それぞれのキューまたはサブスクリプションからその後に削除されます。

2 つ目のモード PeekLock では、受信されたメッセージは指定のロック タイムアウトが期限切れになるまで、他のコンシューマーには表示されないように設定されています。PeekLock モードでは、受信プロセスは 2 ステージになり、エラーが発生したメッセージを許容できないアプリケーションをサポートできるようになります。新しいメッセージ (第 1 ステージ) の受信要求を発行できるほか、処理を行うアプリケーションはメッセージの処理が終了したことを示す (第 2 ステージ) 必要があります。アプリケーションは、メッセージの処理が終了、または後で処理するために確実に保管 (遅延) すると、受信したメッセージで Complete メソッドを呼び出すことにより、受信プロセスの第 2 ステージを完了します。

PeekLock モードを指定する場合、Service Bus にメッセージ処理が完了したことをマークするよう指示する Complete メソッドを呼び出すことによって、常に正常なメッセージの処理を確定する必要がありますPeekLock モードで受信したメッセージで Complete メソッドの呼び出しに失敗すると、メッセージは、指定のロック タイムアウトが時間切れになった後にキューまたはサブスクリプションに再表示されます。その結果、前に処理されたメッセージを再び受信することになり、重複したメッセージが処理されることになります。

さらに、PeekLock モードでは メッセージを正常に処理できない場合、その後の再配信のためにメッセージを返す必要があると Service Bus に通知する必要があります。。可能な限り、メッセージング ソリューションは、メッセージに指定されたロック期限が切れるまで待つのではなく、Abandon メソッドを呼び出すことによってこの状況を処理する必要があります。メッセージング処理コンテキストに使用できる try/catch 例外処理 コンストラクトに属する catch ブロック内から Abandon メソッドを呼び出すことをお勧めします。

メッセージ処理が指定されたロック期間中に確実に発生するようにする必要があります。仲介型メッセージング パターンでは、メッセージの最大ロック期間は 5 分です。現時点では、この期間を実行時に延長することはできません。メッセージの処理がキューまたはサブスクリプションに設定したロック期間を超過した場合、可視化のロックがタイムアウトし、メッセージは再びキューまたはサブスクリプションのコンシューマーに表示されるようになります。このようなメッセージを終了または破棄しようとすると、指定のメッセージに対する有効なロックが見つからないことを示す MessageLockLostException エラーが表示されます。

堅牢なメッセージ受信ループを実装するために、すべての既知の一時的なエラーとメッセージ受信前後に発生する可能性のある異常に対し、復元力を構築することをお勧めします。これは、PeekLock モードを使用してメッセージを受信する場合には特に重要です。PeekLock モードには常に第 2 ステージが存在するため、クライアント上で正常に処理されたメッセージが Service Bus バックエンドでも確実に終了済みとしてマークできると想定すべきではありません。たとえば、基礎となるネットワーク層に障害があると、メッセージ処理は正常に終了できません。このような状況では、同じメッセージを 2 度以上受け取る可能性があるため、冪等エッジケースを処理する必要があります。この動作は、「At Least Once」メッセージ配信モードで動作する他の多くのメッセージング ソリューションと同様です。

拡張メソッドを使用して Complete および Abandon メソッドを呼び出すと、さらに復元力を強化できます。たとえば、次のような記事があります。

public static bool SafeComplete(this BrokeredMessage msg)
{
    try
    {
        // Mark brokered message as complete.
        msg.Complete();

        // Return a result indicating that the message has been completed successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, 
        // or the underlying topic/subscription may have been removed.
        // If Complete() fails with this exception, the only recourse is to prepare to receive another message (possibly the same one).
    }

    return false;
}

public static bool SafeAbandon(this BrokeredMessage msg)
{
    try
    {
        // Abandons a brokered message. This will cause the Service Bus to
       // unlock the message and make it available to be received again, 
        // either by the same consumer or by another competing consumer.
        msg.Abandon();

        // Return a result indicating that the message has been abandoned successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock.
        // We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost,
       //  or the underlying topic/subscription may have been removed.
        // If Abandon() fails with this exception, the only recourse is to receive another message (possibly the same one).
    }

    return false;
}

同様のアプローチを拡張し、Defer などの他のメッセージング操作を潜在的な障害から保護できます。上記の拡張メソッドのパターンを使用して、下記のコード スニペットに反映させることができます。このコードのフラグメントは、拡張メソッドによって強化された復元力を利用し、受信ループを実装する方法を示しています。

var waitTimeout = TimeSpan.FromSeconds(10);

// Declare an action acting as a callback whenever a message arrives on a queue.
AsyncCallback completeReceive = null;

// Declare an action acting as a callback whenever a non-transient
// exception occurs while receiving or processing messages.
Action<Exception> recoverReceive = null;

// Declare a cancellation token that is used to signal an exit from the receive loop.
var cts = new CancellationTokenSource();

// Declare an action implementing the main processing logic for received messages.
Action<BrokeredMessage> processMessage = ((msg) =>
{
    // Put your custom processing logic here. DO NOT swallow any exceptions.
});

// Declare an action responsible for the core operations in the message receive loop.
Action receiveMessage = (() =>
{
    // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // Start receiving a new message asynchronously.
            queueClient.BeginReceive(waitTimeout, cb, null);
        },
        (ar) =>
        {
            // Make sure we are not told to stop receiving while we were waiting for a new message.
            if (!cts.IsCancellationRequested)
            {
                // Complete the asynchronous operation. 
                // This may throw an exception that will be handled internally by retry policy.
                BrokeredMessage msg = queueClient.EndReceive(ar);

                // Check if we actually received any messages.
                if (msg != null)
                {
                    // Make sure we are not told to stop receiving while we were waiting for a new message.
                    if (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            // Process the received message.
                            processMessage(msg);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.SafeComplete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.SafeAbandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }
                    }
                    else
                    {
                        // If we were told to stop processing, 
                        // the current message needs to be unlocked and return back to the queue.
                        if (queueClient.Mode == ReceiveMode.PeekLock)
                        {
                            msg.SafeAbandon();
                        }
                    }
                }
            }

            // Invoke a custom callback method to indicate that we 
            // have completed an iteration in the message receive loop.
            completeReceive(ar);
        },
        (ex) =>
        {
            // Invoke a custom action to indicate that we have encountered an exception and
            // need further decision as to whether to continue receiving messages.
            recoverReceive(ex);
        });
});

// Initialize a custom action acting as a callback whenever a message arrives on a queue.
completeReceive = ((ar) =>
{
    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop.
        receiveMessage();
    }
});

// Initialize a custom action acting as a callback whenever a
// non-transient exception occurs while receiving or processing messages.
recoverReceive = ((ex) =>
{
    // Just log an exception. Do not allow an unhandled exception to
   // terminate the message receive loop abnormally.
    Trace.TraceError(ex.Message);

    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until
       // we are told to stop regardless of any exceptions.
        receiveMessage();
    }
});

// Start receiving messages asynchronously.
receiveMessage();

// Perform any other work. Message will keep arriving asynchronously 
// while we are busy doing something else.

// Stop the message receive loop gracefully.
cts.Cancel();

前の例では、キューに表示するためにメッセージを非同期的に受信する高度なアプローチを実装します。これによって、処理中に発生するエラーはすべて、メッセージを取り消してキューに返し、メッセージを再度処理できるようにします。余分なコードはメッセージ処理ループの正常な取り消しをサポートすることによって揃えられます。

この情報は役に立ちましたか。
(残り 1500 文字)
フィードバックをいただき、ありがとうございました
表示:
© 2014 Microsoft