SALES: 1-800-867-1380

Sending Messages Asynchronously

Updated: March 19, 2014

In order to take advantage of advanced performance features in Windows Azure Service Bus such as client-side batching, you should always consider using the asynchronous programming model when implementing a messaging solution using the .NET managed brokered messaging API. The asynchronous messaging pattern will enable you to build solutions that can generally avoid the overhead of I/O-bound operations such as sending and receiving messages.

Sending Messages Asynchronously

When you invoke an API method asynchronously, control returns immediately to your code and your application continues to execute while the asynchronous operation is being executed independently. Your application either monitors the asynchronous operation or receives notification by way of a callback when the operation is complete. At this time, your application can obtain and process the results.

It is important to note that when you invoke synchronous operations, for example the Send or Receive methods in the QueueClient class (or other synchronous methods provided by Service Bus brokered messaging API), internally the API code flows through the asynchronous versions of the respective methods, albeit in a blocking fashion. However, using the synchronous versions of these methods may not render the full spectrum of performance-related benefits that you can expect when calling the asynchronous versions directly. This is particularly apparent when you are sending or receiving multiple messages and want to perform other processing while the respective messaging operations are being executed.

noteNote
A BrokeredMessage object represents a message, and is provided for the purpose of transmitting data across the wire. As soon as a BrokeredMessage object is sent to a queue or topic, it is consumed by the underlying messaging stack and cannot be reused for further operations. This is due to the fact that once the message body is read, the stream that projects the message data cannot be rewound. You should retain the source data used to construct a BrokeredMessage instance until you can reliably assert the success of the messaging operation. If a failed messaging operation requires a retry, you should construct a new BrokeredMessage instance using that source data.

The following code snippet demonstrates how to send multiple messages asynchronously (as well as reliably) while maintaining the order in which the messages are being sent:

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

Whenever possible, avoid parallelizing the messaging operations using the default scheduling and work partitioning algorithms provided by the Task Parallel Library (TPL) and Parallel LINQ (PLINQ). The basics of the TPL Framework are best suited for adding parallelism and concurrency to applications mostly from a compute-bound operation perspective. The “as is” use of TPL to improve the performance of I/O-bound code such as networking calls and messaging operations may not produce the improvements you would expect. The best way to leverage the TPL to support asynchronous operations is through the use of advanced TPL patterns that conform to the asynchronous programming model.

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback

Community Additions

Show:
© 2014 Microsoft