导出 (0) 打印
全部展开

异步发送消息

更新时间: 2014年3月

若要利用 Microsoft Azure Service Bus 中先进的性能特性(如客户端批处理),在使用 .NET 托管的中转消息传递 API 实现消息传递解决方案时,你应该始终考虑使用异步编程模型。使用异步消息传递模式,你将能够构建通常可以避免 I/O 绑定的操作(如发送和接收消息)开销的解决方案。

异步调用 API 方法时,控制权将立即返回到你的代码,并且你的应用程序将在异步操作独立执行时继续执行。你的应用程序监视异步操作,或在操作完成时通过回调接收通知。此时,你的应用程序可以获取结果并进行处理。

请务必注意,当你调用同步操作(例如,QueueClient 类中的 SendReceive 方法,或者 Service Bus 中转消息传递 API 所提供的其他同步方法)时,API 代码在内部将按各个方法的异步版本执行(尽管采用的是阻塞方式)。但是,使用这些方法的同步版本可能无法带来你在直接调用异步版本时可以期望的与性能相关的所有好处。如果你在发送或接收多个消息,并且要在执行各个消息传递操作的同时执行其他处理,那么这一点尤为明显。

note备注
一个 BrokeredMessage 对象表示一条消息,提供此对象是为了通过线路传输数据。BrokeredMessage 对象发送到队列或主题后,将立即由基础消息传递堆栈使用,并且不能重新用于进一步的操作。这是由于一个事实:读取消息正文之后,不能将映射消息数据的流倒回。在可以可靠断言消息传递操作已成功之前,你应保留用于构造 BrokeredMessage 实例的源数据。如果失败的消息传递操作需要重试,你应构造一个使用该源数据的新 BrokeredMessage 实例。

以下代码段演示了如何在保持消息发送顺序不变的同时异步(并且可靠地)发送多条消息:

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

请尽可能避免使用任务并行库 (TPL)并行 LINQ (PLINQ) 提供的默认计划和工作分区算法并行执行消息传递操作。从计算绑定操作的角度看,TPL 框架的基本要素通常最适用于提高应用程序并行性和并发性。“按原样”使用 TPL 可改善网络调用等 I/O 绑定代码的性能,但消息传递操作可能不会产生你期望的改进。利用 TPL 支持异步操作的最佳方法是使用遵循异步编程模型的高级 TPL 模式

显示:
© 2014 Microsoft