匯出 (0) 列印
全部展開

Creating Applications that Use Service Bus Queues

更新日期: 2014年10月

本文說明佇列的實用性,並示範如何撰寫使用 Microsoft Azure 服務匯流排 的簡易佇列架構應用程式。

假設有下列案例:在零售產業中,業者必須將個別銷售點 (POS) 終端機中的銷售資料路由傳送至庫存管理系統,供其判斷是否必須補充存貨。下列解決方案使用 服務匯流排 訊息作為終端機與庫存管理系統之間的通訊工具,如下所示:

Service-Bus-Queues-Img1

每個 POS 終端機分別將訊息傳送至 DataCollectionQueue,以報告其銷售資料。在庫存管理系統擷取這些訊息之前,訊息會存留在此佇列中。此模式常被稱為非同步訊息,因為 POS 終端機無須等候庫存管理系統的回覆即可繼續處理。

在我們討論設定此應用程式所需的程式碼之前,請考量在此案例中使用佇列、而不將 POS 終端機直接 (同步) 與庫存管理系統聯繫的好處。

使用非同步訊息模式時,產生者和消費者不需要同時連線。此訊息基礎結構會可靠地儲存訊息,直到取用方準備接收訊息為止。如此可讓分散式應用程式的元件中斷連接,不論是主動中斷連接 (例如進行維護) 或由於元件損毀而中斷連接,都不會影響整體系統。再者,消費性應用程式只需在一天的特定時間內連線即可。例如,在此零售案例中,庫存管理系統只需在營業日結束後上線即可。

在許多應用程式中,系統負載會隨著時間改變,但每一單位的工作所需的處理時間通常不變。利用佇列來調解訊息產生者和消費者,表示只需要佈建消費性應用程式 (背景工作) 即可提供平均負載,而非高峰負載。佇列的深度會一直增加,且合約會隨著連入負載而改變。這可直接節省為應用程式負載提供服務所需的基礎結構數量相關費用。

Service-Bus-Queues-Img2

隨著負載增加,可以增加更多背景工作處理程序,以便從背景工作佇列讀取。每則訊息都是由其中一個背景工作處理程序所處理。此外,即使背景工作電腦的處理能力各有不同 (因為它們會以各自的最大速率提取訊息),此種提取式負載平衡還是能充分利用背景工作電腦。此模式通常稱為競爭消費者模式。

Service-Bus-Queues-Img3

使用訊息佇列來調解訊息產生者和消費者,可提供元件之間的內建鬆散結合。因為產生者和消費者並未注意到彼此,所以消費者可以在不對產生者造成影響的情況下升級。此外,訊息拓撲可在不影響現有端點的情況下進化。關於這點,我們將在討論發佈/訂閱時詳加說明。

下節將說明如何使用 服務匯流排 建置此應用程式。

您必須要有 的帳戶,才能開始使用 服務匯流排。如果您還沒有此帳戶,您可以在這裡註冊免費試用版。您必須使用 Windows Live ID (Microsoft 帳戶) 登入,此帳戶將與您的 服務匯流排 帳戶相關聯。完成此動作後,您即可建立新的 服務匯流排 訂閱。往後當您以 Windows Live ID (Microsoft 帳戶) 登入時,您將可存取所有與您的帳戶相關聯的 服務匯流排 訂閱。

在擁有訂閱後,您即可建立新的 服務命名空間。您必須為新的 服務命名空間 指定在所有 服務匯流排 帳戶間皆具有唯一性的名稱。每個 服務命名空間 分別會作為一組 服務匯流排 實體的容器。如需詳細資訊,請參閱 作法:建立或修改 Service Bus Service 命名空間.

若要使用 服務匯流排 服務命名空間,應用程式必須參考 服務匯流排 組件 (尤其是 Microsoft.ServiceBus.dll)。您可以在 Azure SDK 下載頁面 中找到並下載這個屬於 Microsoft Azure SDK 一部分的組件。但要取得 服務匯流排 API 並以所有的 服務匯流排 相依性設定您的應用程式,服務匯流排 NuGet 封裝會是最簡單的途徑。如需使用 NuGet 和 服務匯流排 封裝的詳細資訊,請參閱使用 NuGet 服務匯流排封裝

服務匯流排 訊息實體 (佇列和發佈/訂閱主題) 的管理作業可透過 NamespaceManager 類別執行。服務匯流排 會使用透過 Microsoft Azure Active Directory 存取控制服務 (也稱為「存取控制服務」或 ACS) 實作的宣告型安全性模型。若要為特定 服務命名空間 建立 NamespaceManager 執行個體,必須要有適當的認證。TokenProvider 類別代表會以內建的 Factory 方法傳回某些已知權杖提供者的安全性權杖提供者。我們將使用 SharedSecretTokenProvider 類別來存放共用密碼認證,以及處理從 ACS 取得適當權杖的作業。接著,將會以 服務匯流排 服務命名空間 的基底位址和權杖提供者建構 NamespaceManager 執行個體。

NamespaceManager 類別可提供建立、列舉及刪除訊息實體的方法。此處顯示的程式碼片段可說明 NamespaceManager 執行個體如何建立,以及用來建立 DataCollectionQueue 佇列。

Uri uri = ServiceBusEnvironment.CreateServiceUri("sb""ingham-blog"string.Empty);
string name = "owner";
string key = "abcdefghijklmopqrstuvwxyz";
 
TokenProvider tokenProvider = 
    TokenProvider.CreateSharedSecretTokenProvider(name, key);
NamespaceManager namespaceManager = 
    new NamespaceManager(uri, tokenProvider);
namespaceManager.CreateQueue("DataCollectionQueue");

請注意,CreateQueue 方法有多載功能可調整佇列的屬性。例如,您可以為傳送至佇列的訊息設定預設存留時間 (TTL) 值。

適用於 服務匯流排 實體的執行階段作業,以傳送和接收訊息為例,應用程式必須先建立 MessagingFactory 物件。與 NamespaceManager 類別相似,MessagingFactory 執行個體會從 服務命名空間 的基底位址和權杖提供者建立。

 MessagingFactory factory = MessagingFactory.Create(uri, tokenProvider);

傳送至和接收自 服務匯流排 佇列的訊息,是 BrokeredMessage 類別的執行個體。此類別包含一組標準屬性 (例如 LabelTimeToLive)、用以存放應用程式屬性的字典,和任意應用程式資料的主體。應用程式可傳入任何可序列化的物件 (下列範例會傳入 SalesData 物件,代表來自 POS 終端機的銷售資料) 以設定主體,而主體將使用 DataContractSerializer 來序列化物件。或者,也可以提供 Stream

 BrokeredMessage bm = new BrokeredMessage(salesData);
 bm.Label = "SalesReport";
 bm.Properties["StoreName"] = "Redmond";
 bm.Properties["MachineID"] = "POS_1";

要將訊息傳送至指定的佇列 (在我們的範例中為 DataCollectionQueue),最簡單的方式是使用 CreateMessageSender 直接從 MessagingFactory 執行個體建立 MessageSender 物件。

MessageSender sender = factory.CreateMessageSender("DataCollectionQueue");
sender.Send(bm);

要從佇列接收訊息,最簡單的方式是使用您可以透過 CreateMessageReceiver 直接從 MessagingFactory 建立的 MessageReceiver 物件。訊息接收端可在兩種不同的模式中工作:ReceiveAndDeletePeekLockReceiveMode 會在訊息接收端建立時設定,作為 CreateMessageReceiver 呼叫的參數。

在使用 ReceiveAndDelete 模式時,接收作業是單發式作業,也就是說,當 服務匯流排 接收要求時,它會將訊息標示為正在使用並傳回到應用程式。ReceiveAndDelete 模式是最簡單的模式,最適合用於應用程式容許在發生失敗時不處理訊息的案例。若要了解此種模式,請考慮消費者發出接收要求,而後再處理前當機的案例。因為 服務匯流排 已將訊息標示為正在使用,所以當應用程式重新啟動並再次開始使用訊息時,將會遺失在當機前使用的訊息。

PeekLock 模式中,接收會變成兩階段作業,因而有可能支援不容許遺失訊息的應用程式。當 服務匯流排 接收要求時,會發現將要使用的下一個訊息,予以鎖定,以免其他消費者接收該訊息,然後再將它傳回到應用程式。在應用程式處理好訊息 (或可靠地儲存以供未來處理) 之後,於接收的訊息上呼叫 Complete,便完成接收處理程序的第二個階段。當 服務匯流排 看見 Complete 時,它會將訊息標示為正在使用。

另外還有兩種可能的結果。第一,如果應用程式基於某個原因而無法處理訊息,則可於接收的訊息上呼叫 Abandon (而非 Complete)。這會導致 服務匯流排 將訊息解除鎖定,並使其可供相同的消費者或其他競爭消費者再度接收。其次,鎖定有關聯的逾時設定,如果應用程式無法在鎖定逾時到期前處理訊息 (例如,應用程式當機),則 服務匯流排 會將訊息解除鎖定,並使其可供再度接收。

在此請注意,如果應用程式在處理訊息之後,在 Complete 要求發出之前發生當機,則系統會在應用程式重新啟動時重新傳遞訊息。我們常稱之為「至少一次」(At Least Once) 處理。這表示,每則訊息至少會處理一次,但在某些情況下,會重新傳遞相同的訊息。在無法容許重複處理的情況下,應用程式需要有附加邏輯才能偵測重複項目。此作業可根據訊息的 MessageId 屬性來達成。此屬性的值在每次嘗試傳遞時都會維持不變。這稱為「正好一次」(Exactly Once) 處理。

此處顯示的程式碼說明如何使用 PeekLock 模式接收及處理訊息;若未明確提供 ReceiveMode 值,則會以此模式作為預設值。

MessageReceiver receiver = factory.CreateMessageReceiver("DataCollectionQueue");
BrokeredMessage receivedMessage = receiver.Receive();
try
{
    ProcessMessage(receivedMessage);
    receivedMessage.Complete();
}
catch (Exception e)
{
    receivedMessage.Abandon();
}

在本節先前的範例中,我們直接從 MessagingFactory 建立了 MessageSenderMessageReceiver 物件,分別用來傳送及接收來自佇列的訊息。替代方法之一是使用 QueueClient 類別;此類別除了工作階段等進階功能以外,也同時支援傳送和接收作業。

QueueClient queueClient = 
     factory.CreateQueueClient("DataCollectionQueue");
queueClient.Send(bm);
            
BrokeredMessage message = queueClient.Receive();
try
{
    ProcessMessage(message);
    message.Complete();
}
catch (Exception e)
{
    message.Abandon();
} 

顯示:
© 2014 Microsoft