SALES: 1-800-867-1380

Implementing Reliable Message Receive Loops

Updated: March 19, 2014

Through observations from several customer projects leveraging the brokered messaging API, we noticed that receiving a message is often subject to a canonical repeated implementation of the receive logic without a sound approach to handling potential anomalies. Generally, such logic does not allow for edge cases; for example, expired message locks. This type of approach can be error-prone if it is not implemented in a robust fashion. The purpose of this section is to provide some specific recommendations around the implementation of reliable message receive logic.

Reliable Message Receive Loops

First, it is important to note the two distinct modes in which messages can be received from Service Bus. These modes are provided by the brokered messaging API to support message delivery using either “At Most Once” (with ReceiveAndDelete) or “At Least Once” (with PeekLock) semantics.

The first mode is ReceiveAndDelete, which is the simplest model and works best for scenarios in which the application can tolerate a failure in message processing. When using the ReceiveAndDelete mode, the receive action is a single-hop operation during which a message delivered to the client is marked as being consumed and subsequently removed from the respective queue or subscription.

The second mode is PeekLock, which prescribes that a received message is to remain hidden from other consumers until its lock timeout expires. With the PeekLock mode, the receive process becomes a two-stage operation making it possible to support applications that cannot tolerate failed messages. In addition to issuing a request to receive a new message (first stage), the consuming application is required to indicate when it has finished processing the message (second stage). After the application finishes processing the message, or stores (defers) it reliably for future processing, it completes the second stage of the receive process by calling the Complete method on the received message.

When you specify PeekLock mode, you should always finalize the successful processing of a message by calling the Complete method, which tells Service Bus to mark the message processing as completed. Failure to call the Complete method on a message received in PeekLock mode will result in the message re-appearing in a queue or subscription after its lock timeout expires. Consequently, you will receive the previously processed message again, and this may result in a duplicate message being processed.

In addition, in relation to PeekLock mode, you should tell the Service Bus if a message cannot be successfully processed and therefore must be returned for subsequent redelivery. Whenever possible, your messaging solution should handle this situation by calling the Abandon method, instead of waiting until a lock acquired for the message expires. Ideally, you will call the Abandon method from within a catch block that belongs to the try/catch exception handling construct serving the messaging handling context.

It is important to ensure that message processing happens strictly within the designated lock period. In the brokered messaging pattern, the maximum message lock duration is 5 minutes, and this duration cannot currently be extended at runtime. If a message takes longer to process than the lock duration set on a queue or subscription, its visibility lock will time out and the message will again become available to the consumers of the queue or subscription. If you attempt to complete or abandon such a message, you may receive a MessageLockLostException error that indicates there is no valid lock found for the given message.

In order to implement a robust message receive loop, it is recommended that you build resilience against all known transient errors as well as any abnormalities that can manifest themselves during or after message processing. This is especially important when receiving messages using PeekLock mode. Because there is always a second stage involved in PeekLock mode, you should never assume that a message successfully processed on the client can be reliably marked as completed in the Service Bus backend. For example, a fault in the underlying network layer may prevent you from completing message processing successfully. Such an implication requires that you handle idempotency edge cases, as you may receive the same message more than once. This behavior is in line with many other messaging solutions that operate in the “At Least Once” message delivery mode.

You can add additional resilience when calling the Complete and Abandon methods by using extension methods. For example:

public static bool SafeComplete(this BrokeredMessage msg)
{
    try
    {
        // Mark brokered message as complete.
        msg.Complete();

        // Return a result indicating that the message has been completed successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, 
        // or the underlying topic/subscription may have been removed.
        // If Complete() fails with this exception, the only recourse is to prepare to receive another message (possibly the same one).
    }

    return false;
}

public static bool SafeAbandon(this BrokeredMessage msg)
{
    try
    {
        // Abandons a brokered message. This will cause the Service Bus to
       // unlock the message and make it available to be received again, 
        // either by the same consumer or by another competing consumer.
        msg.Abandon();

        // Return a result indicating that the message has been abandoned successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock.
        // We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost,
       //  or the underlying topic/subscription may have been removed.
        // If Abandon() fails with this exception, the only recourse is to receive another message (possibly the same one).
    }

    return false;
}

A similar approach can be extended to shield other messaging operations such as Defer from potential failures. The pattern in which the above extension methods can be used is reflected in the code snippet below. This code fragment demonstrates how to implement a receive loop while taking advantage of the additional resilience provided by the extension methods:

var waitTimeout = TimeSpan.FromSeconds(10);

// Declare an action acting as a callback whenever a message arrives on a queue.
AsyncCallback completeReceive = null;

// Declare an action acting as a callback whenever a non-transient
// exception occurs while receiving or processing messages.
Action<Exception> recoverReceive = null;

// Declare a cancellation token that is used to signal an exit from the receive loop.
var cts = new CancellationTokenSource();

// Declare an action implementing the main processing logic for received messages.
Action<BrokeredMessage> processMessage = ((msg) =>
{
    // Put your custom processing logic here. DO NOT swallow any exceptions.
});

// Declare an action responsible for the core operations in the message receive loop.
Action receiveMessage = (() =>
{
    // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // Start receiving a new message asynchronously.
            queueClient.BeginReceive(waitTimeout, cb, null);
        },
        (ar) =>
        {
            // Make sure we are not told to stop receiving while we were waiting for a new message.
            if (!cts.IsCancellationRequested)
            {
                // Complete the asynchronous operation. 
                // This may throw an exception that will be handled internally by retry policy.
                BrokeredMessage msg = queueClient.EndReceive(ar);

                // Check if we actually received any messages.
                if (msg != null)
                {
                    // Make sure we are not told to stop receiving while we were waiting for a new message.
                    if (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            // Process the received message.
                            processMessage(msg);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.SafeComplete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.SafeAbandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }
                    }
                    else
                    {
                        // If we were told to stop processing, 
                        // the current message needs to be unlocked and return back to the queue.
                        if (queueClient.Mode == ReceiveMode.PeekLock)
                        {
                            msg.SafeAbandon();
                        }
                    }
                }
            }

            // Invoke a custom callback method to indicate that we 
            // have completed an iteration in the message receive loop.
            completeReceive(ar);
        },
        (ex) =>
        {
            // Invoke a custom action to indicate that we have encountered an exception and
            // need further decision as to whether to continue receiving messages.
            recoverReceive(ex);
        });
});

// Initialize a custom action acting as a callback whenever a message arrives on a queue.
completeReceive = ((ar) =>
{
    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop.
        receiveMessage();
    }
});

// Initialize a custom action acting as a callback whenever a
// non-transient exception occurs while receiving or processing messages.
recoverReceive = ((ex) =>
{
    // Just log an exception. Do not allow an unhandled exception to
   // terminate the message receive loop abnormally.
    Trace.TraceError(ex.Message);

    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until
       // we are told to stop regardless of any exceptions.
        receiveMessage();
    }
});

// Start receiving messages asynchronously.
receiveMessage();

// Perform any other work. Message will keep arriving asynchronously 
// while we are busy doing something else.

// Stop the message receive loop gracefully.
cts.Cancel();

The preceding example implements an advanced approach to receiving messages asynchronously in the order in which they appear on a queue. It ensures that any errors encountered during processing will result in cancelling the message and returning it back into the queue so that it can be re-processed. The extra code is justified by supporting graceful cancellation of the message receive loop.

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback

Community Additions

ADD
Show:
© 2014 Microsoft