In one of our previous articles, we discussed the importance of using messaging patterns in the cloud in order to decouple solutions and promote easy-to-scale software architectures. (See “Comparing Windows Azure Queues and Service Bus Queues” at
msdn.microsoft.com/magazine/jj159884.) Queuing is one of these messaging patterns, and the Windows Azure platform offers two main options to implement this approach: Queue storage services and Service Bus Queues, both of which cover scenarios where multiple consumers compete to receive and process each of the messages in a queue. This is the canonical model for supporting variable workloads in the cloud, where receivers can be dynamically added or removed based on the size of the queue, offering a load balancing/failover mechanism for the back end (see Figure 1).
.png)
Figure 1 Queuing Messaging Pattern: Each Message Is Consumed by a Single Receiver
Even though the queuing messaging pattern is a great solution for simple decoupling, there are situations where each receiver requires its own copy of the message, with the option of discarding some messages based on specific rules. A good example of this type of scenario is shown in Figure 2, which illustrates a common challenge that retail companies face when sending information to multiple branches, such as the latest products catalog or an updated price list.
.png)
Figure 2 Publisher/Subscriber Messaging Pattern: Each Message Can Be Consumed More Than Once
For these situations, the publisher/subscriber pattern is a better fit, where the receivers simply express an interest in one or more message categories, connecting to an independent subscription that contains a copy of the message stream. The Windows Azure Service Bus implements the publisher/subscriber messaging pattern through topics and subscriptions, which greatly enhances the ability to control how messages are distributed, based on independent rules and filters. In this article, we’ll explain how to apply these Windows Azure Service Bus capabilities using a simple real-life scenario, assuming the following requirements:
All the code samples for this article were created with Visual Studio 2012, using C# as the programming language. You’ll also need the Windows Azure SDK version 1.8 for .NET developers and access to a Windows Azure subscription.
Before writing any code, you need to define the different entities (topics and subscriptions) that will become part of the messaging workflow. This can be accomplished by accessing the Windows Azure Portal at manage.windowsazure.com. Log in with your credentials and follow these steps:
.png)
Figure 3 Creating a New Service Bus Topic Using the Windows Azure Portal
Once the topics and subscriptions have been created, you can also access them directly in Visual Studio. To do so, open Server Explorer (View | Server Explorer) and expand the Windows Azure Service Bus node (see Figure 4). Right-click on the Windows Azure Service Bus node and select Add New Connection. Enter the Namespace name, Issuer Name (usually “owner”) and Issuer Access Key you recorded when the Windows Azure namespace was created in the portal.
.png)
Figure 4 Creating a Service Bus Topic and Subscriptions Using the Visual Studio Tools
Keep in mind that it’s possible to programmatically create and manage these entities using classes in the Microsoft.ServiceBus.Messaging namespace, including TopicClient and SubscriptionClient, which are used later in this article.
Once the basic structure for the messaging workflow has been created, we’ll simulate traffic using two console applications created in Visual Studio, as shown in Figure 5. The first console application, MSDNSender, will send the products catalog. The second, MSDNReceiver, will receive the information in each of the stores. We’ll analyze the code in the following sections. In the Pub/Sub pattern, the MSDNSender is the publisher and the MSDNReceiver is the subscriber.
.png)
Figure 5 Visual Studio Solution to Simulate the Products Catalog Scenario
As you can see in Figure 2, Headquarters (the publisher) sends messages to a topic. This logic is represented by the code in the main file, Program.cs, a part of the MSDNSender project. Program.cs encapsulates the logic and code to send a list of products as individual messages to the topic. Let’s take a look at the different sections, starting with the Main method. Notice that first we create a client for the topic, as follows:
// Create a topicClient using the
// Service Bus credentials
TopicClient topicClient =
TopicClient.CreateFromConnectionString(
serviceBusConnectionString, topicName);
Once a topicClient is created, the publisher can send messages using it. The list of products to be sent is stored in an XML file called ProductsCatalog.xml, which contains a list of 10 product entities that will be transformed into an array of objects. The products will then get mapped into the Catalog and Product classes stored in the Product.cs file:
// Deserialize XML file with Products, and store them in an object array
Catalog catalog = null;
string path = "ProductsCatalog.xml";
XmlSerializer serializer = new XmlSerializer(typeof(Catalog));
StreamReader reader = new StreamReader(path);
catalog = (Catalog) serializer.Deserialize(reader);
reader.Close();
Each Product in the catalog array presents the structure shown in Figure 6.
Figure 6 Class Representation for Products in the Catalog
public class Product
{
[System.Xml.Serialization.XmlElement("ProductId")]
public string ProductId { get; set; }
[System.Xml.Serialization.XmlElement("ProductName")]
public string ProductName { get; set; }
[System.Xml.Serialization.XmlElement("Category")]
public string Category { get; set; }
[System.Xml.Serialization.XmlElement("CatalogPage")]
public int CatalogPage { get; set; }
[System.Xml.Serialization.XmlElement("MSRP")]
public double MSRP { get; set; }
[System.Xml.Serialization.XmlElement("Store")]
public string Store { get; set; }
}
Inside the array loop, a call to the CreateMessage method extracts different properties from the Product objects and assigns them to the message to be sent. Two properties require extra attention:
if (isLastProductInArray)
message.Properties.Add("IsLastMessageInSession", "true");
message.SessionId = catalogName;
Sessions are extremely important, because they allow the receiver to determine whether all the messages that belong to a specific logical group have arrived. In this case, by setting the SessionId message property, we’re specifying that the receiver shouldn’t use the catalog information until after all the messages with the same catalogName value have arrived. Also, for the last product in the array, we’re adding a new property: IsLastMessageInSession, which will allow the receivers to determine if the last message in the session has arrived, and the catalog can be fully processed. Figure 7 shows MSDNSender running.
.png)
Figure 7 Execution of the MSDNSender Project
Now that the catalog and products have been sent out to the topic and copied to the different subscriptions, let’s turn our attention to the MSDNReceiver project, where messages are received and processed. Note that in the Main method of Program.cs, the code creates a client for the Subscription based on information provided by the user via a Console.ReadLine command. Users are expected to enter their store number, which reflects the messages they wish to receive. In short, each branch store is concerned only with messages that apply to that store:
Console.WriteLine("Enter Store Number");
string storeNumber = Console.ReadLine();
Console.WriteLine("Selecting Subscription for Store...");
// Create a Subscription Client to the Topic
SubscriptionClient subscriptionClient =
SubscriptionClient.CreateFromConnectionString(
serviceBusConnectionString, topicName,
"Store" + storeNumber.Trim() + "Sub",
ReceiveMode.PeekLock);
Because we’re receiving messages from the subscriptions based on sessions (as explained in the previous section), we need to request the next one using the following line of code:
MessageSession sessionReceiver =
subscriptionClient.AcceptMessageSession(TimeSpan.FromSeconds(5));
Basically, what this means is that the client will check for any to-be-processed messages in the subscription—those whose SessionId property is not null—and if no such messages are encountered within a period of five seconds, the request will time out, terminating the receiver application. On the other hand, if a session is found, the ReceivingSessionMessages method will be called. Before we jump into this piece of code, let’s discuss the concept of session state, which allows the developer to store information that can be used while messages that belong to the same transaction are received. In this case, we’re using session state to “remember” the last catalog page that was received, as well as the messages—products—that arrived out of order.
Based on this, here’s the workflow in code:
Figure 8 The ReceivedSessionMessages Method in Code
static void ReceiveSessionMessages(MessageSession receiver)
{
// Read messages from subscription until subscription is empty
Console.WriteLine("Reading messages from subscription {0}",
receiver.Path);
Console.WriteLine("Receiver Type:" + receiver.GetType().Name);
Console.WriteLine("Receiver.SessionId = " + receiver.SessionId);
SequenceState sessionState = GetState(receiver);
BrokeredMessage receivedMessage;
while ((receivedMessage = receiver.Receive()) != null)
{
string sessionId = receiver.SessionId;
ProcessMessage(receivedMessage, ref sessionState, receiver);
while (sessionState.GetNextOutOfSequenceMessage() != -1)
{
// Call back deferred messages
Console.WriteLine("Calling back for deferred message: Category {0},
Message sequence {1}", receiver.SessionId,
sessionState.GetNextSequenceId());
receivedMessage = receiver.Receive(
sessionState.GetNextOutOfSequenceMessage());
ProcessMessage(receivedMessage, ref sessionState, receiver);
}
if (receivedMessage.Properties.ContainsKey(
"IsLastMessageInSession"))
break;
}
SetState(receiver, null);
receiver.Close();
}
Figure 9 The ProcessMessage Method in Code
static void ProcessMessage(BrokeredMessage message, ref SequenceState sessionState,
MessageSession session = null)
{
if (session != null)
{
int messageId = Convert.ToInt32(message.Properties["CatalogPage"]);
if (sessionState.GetNextSequenceId() == messageId)
{
OutputMessageInfo("RECV: ", message, "State: " + "RECEIVED");
sessionState.SetNextSequenceId(messageId + 1);
message.Complete();
SetState(session, sessionState);
}
else
{
Console.WriteLine("Deferring message: Category {0}, Catalog Page {1}",
session.SessionId, messageId);
sessionState.AddOutOfSequenceMessage(messageId,
message.SequenceNumber);
message.Defer();
SetState(session, sessionState);
}
}
Thread.Sleep(receiverDelay);
}
Keep in mind that for this project, deferred message IDs are stored in the session state, and could be potentially lost. In a production environment, we recommend using some type of persisted storage (Windows Azure Tables is one option) for this purpose. Note that if the message contains the property IsLastMessageSessionInSession (set during the sending process), the session loop is terminated. The console output for the MSDNReceiver project can be seen in Figure 10.
.png)
Figure 10 Execution of the MSDNReceiver project
Windows Azure Service Bus subscriptions give you the ability to create specific rules that filter out messages before they’re consumed. In this case, it would be relatively easy to create a rule that segregates products by category or by store number (which we ignored in this project). Rules can be created programmatically, directly in the Windows Azure portal or through Visual Studio tools.
The Windows Azure Service Bus offers an amazingly robust and flexible implementation of the publish/subscribe pattern. Many different scenarios can be addressed through the use of topics and subscriptions. The ability to support multiple senders broadcasting messages to multiple receivers, combined with the ability to logically group and sort messages, opens up a world of possibilities for the modern developer. Moreover, being able to leverage a persistent session to track state makes it straightforward to logically group messages and control their sequence. In a world where distributed environments are the norm, understanding how to use messaging patterns and the tools around them is crucial for today’s software architects working in the cloud.
Bruno Terkaly is a developer evangelist for Microsoft. His depth of knowledge comes from years of experience in the field, writing code using a multitude of platforms, languages, frameworks, SDKs, libraries and APIs. He spends time writing code, blogging and giving live presentations on building cloud-based applications, specifically using the Windows Azure platform.
Ricardo Villalobos is a seasoned software architect with more than 15 years of experience designing and creating applications for companies in the supply chain management industry. Holding different technical certifications, as well as a master’s degree in business administration from the University of Dallas, he works as a cloud architect in the Windows Azure CSV incubation group for Microsoft.
Thanks to the following technical expert for reviewing this article: Abhishek Lal