Export (0) Print
Expand All

Receiving Messages Asynchronously

Updated: March 19, 2014

Similar to sending messages asynchronously, and also from a practical point of view, you can also extend the use of the asynchronous programming model to receiving messages from Microsoft Azure Service Bus.

Receiving Messages Asynchronously

While waiting for new messages either on a Service Bus queue or subscription, your solution often issues a polling request. Fortunately, Service Bus offers a long-polling receive operation which maintains a connection to the server until a message arrives on a queue or the specified timeout period has elapsed, whichever occurs first. If a long-polling receive is performed synchronously, it will block the CLR thread pool thread while waiting for a new message, which is not considered optimal. The capacity of the CLR thread pool is generally limited; hence there is good reason to avoid using the thread pool for particularly long-running operations.

To build a truly effective messaging solution using the Service Bus brokered messaging API, you should always perform the receive operation asynchronously. Whether your solution receives one message at a time or fetches multiple messages, you begin the receive operation using the BeginReceive method with the specified timeout. In the current API, the maximum receive timeout value is 24 days. While the Service Bus messaging client is waiting on your behalf for a new message, your solution can proceed with performing any other work. Upon completion, your callback method will be notified and the message that was received (if any) will be available for processing.

noteNote
Once a message is received from a queue or subscription, its body can only be read once. Due to the nature of network protocols, message data streams are not always “rewindable”, because they do not often support a seek operation. You should secure the message data by placing it into an object after calling the GetBody() method, then keep that object for as long as you need it. Attempting to invoke the GetBody() method more than once is not supported by the brokered messaging API.

The code sample below shows an example of a programming method that asynchronously receives the specified number of messages from a Service Bus queue:

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

In line with the recommendation in the previous section, it is best to use the asynchronous programming model integration provided by Task Parallel Library for parallelizing the asynchronous message receive operation.

Community Additions

ADD
Show:
© 2014 Microsoft