銷售: 1-800-867-1380

非同步接收訊息

更新日期: 2014年3月

與非同步傳送訊息相似,同時也從實際的觀點來看,您也可以展開使用非同步程式設計模型,以從 Microsoft Azure 服務匯流排 接收訊息。

在等待 服務匯流排 佇列或訂閱中的新訊息時,您的方案通常會發出輪詢要求。不過,服務匯流排 提供長時間輪論接收作業,其會維持伺服器的連線,直到訊息抵達佇列或經過指定的逾時期間為止 (先何者先達成)。如果同步執行長時間輪詢接收,會在等待新序息時封鎖 CLR 執行緒集區執行緒,這不是最佳效能的表現。CLR 執行緒集區的容量通常有一定限制,因此可以避免使用特定長時間執行作業的執行緒輪詢

要使用 服務匯流排 代理訊息 API 建立實際有效的訊息方案,您應該一律執行非同步作業。無論您的方案是一次收到一則訊息,或提取多個訊息,您都會開始使用具備特定逾時的 BeginReceive 方法接收作業。在目前的 API 中,最大接收逾時值為 24 天。當 服務匯流排 訊息用戶端正在代您等待新訊息時,您的方案可以處理任何其他工作。完成之後,會通知您的回呼方法且收到的訊息 (如果有) 就可以進行處理。

note附註
一旦從佇列或訂閱收到訊息,其主體就只能讀取一次。由於網路通訊協定的本質,訊息資料串流並非永遠是「可重新扭轉」的,因為它們通常無法支援搜尋作業。您應該在呼叫 GetBody() 方法之後將其放到物件中以保護訊息資料,再保留該物件以便您需要時可以使用。代理訊息 API 不支援嘗試多次叫用 GetBody() 方法。

以下的程式碼範例會顯示從 服務匯流排 佇列非同步接收指定訊息數量的程式設計方法範例:

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

與先前章節中的建議相同,您最好使用由工作平行程式庫提供的非同步程式設計模型整合以平行處理非同步訊息接收作業

本文對您有任何幫助嗎?
(剩餘 1500 個字元)
感謝您提供意見
顯示:
© 2014 Microsoft