銷售: 1-800-867-1380

非同步傳送訊息

更新日期: 2014年3月

為了使用 Microsoft Azure 服務匯流排 中的進階效能功能,例如用戶端批次處理,使用 .NET Managed 代理訊息 API 實作訊息方案時應該一律使用非同步程式設計模型。非同步訊息模式可讓您建立可以避免大部分的 I/O 繫結所帶來的負擔,包括傳送和接收訊息。

當您非同步叫用 API 方法時,控制項會立即傳回您的程式碼,而您的應用程式會在單獨執行非同步作業時持續執行您的應用程式。您的應用程式會監控非同步作業或接收作業完成時收到的回呼通知。在這個時候,您的應用程式可以取得並處理結果。

請注意,當您非同步叫用作業時,例如 QueueClient 類別中的 SendReceive 方法 (或其他由 服務匯流排 代理程式訊息 API 提供的同步方法),內部的 API 程式碼會以封鎖的方式流經相關方法的非同步版本。不過,使用這些方法的同步版本不會將您直接呼叫非同步版本時得到的項目轉譯為效能相關的完整優點。當您傳送或接收多個訊息,且在耗用相關訊息作業時想要執行其他處理時,效果特別明顯。

note附註
BrokeredMessage 物件表示訊息,且提供用於跨線路傳送之用。一旦 BrokeredMessage 物件傳送至佇列或主題,會由基礎訊息堆疊耗用,且無法為其他作業重新使用。這是因為一門讀取了訊息主體,投射訊息資料的資料流無法回復。您應該保留用來建構 BrokeredMessage 執行個體的來源資料,直到您可以放心地判斷提示訊息作業為止。如果失敗的訊息作業需要重試,您應該使用該來源資料建構一個新的 BrokeredMessage 執行個體。

以下程式碼片段會示範如何在維護要傳送訊息的順序時非同步 (而且可靠地) 傳送多個訊息:

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

如果可能,請避免使用預設排程以及由 工作平行程式庫 (TPL) 以及平行 LINQ (PLINQ) 提供的工作分割演算法平行處理訊息作業。TPL Framework 的基礎最適用於新增平行處理原則及並行處理至大部分來自於運算繫結作業觀點的應用程式。「依現狀」使用 TPL 以改善 I/O 繫結程式碼效能 (例如網路通話以及訊息作業) 不會產生您預期的改進。充分利用 TPL 以支援非同步作業的最佳方式,就是透過使用符合非同步程式設計模型的進階 TPL 模式

本文對您有任何幫助嗎?
(剩餘 1500 個字元)
感謝您提供意見
顯示:
© 2014 Microsoft