October 2011

Volume 26 Number 10

Forecast: Cloudy - The Microsoft Azure Service Bus: Topics

By Joseph Fultz | October 2011

This article is based on the Azure CTP June Update. All information is subject to change.

Joseph FultzIt’s no secret among my colleagues that the Azure Service Bus functionality didn’t really get much support from me. However, with the Azure CTP June Update, Microsoft has finally added enough features to move the Service Bus from what I considered not much more than a placeholder to a truly useful technology. For my purpose here, the essential piece of messaging technology the Azure Service Bus now offers is Topics, a rich publish-and-subscribe capability. I’ll focus on Topics in this article and draw, as I am so often apt to do, from my retail industry experience to look at how the technology can be used to facilitate inter-store inventory checks.    

Have you ever gone to buy something and found that the last one has just been sold, or that the item you want is in some way messed up? When this happens, the sales clerk will often go into the POS system and check the inventory at nearby stores. More often than not, that check is against inventory counts that are kept in a central database or enterprise resource planning (ERP) system of some type, and the clerk usually checks by store number using her tribal knowledge of the nearby stores. Often, the data is a bit stale because it’s only refreshed as part of the end-of-day processing when the transaction logs and other data are uploaded and processed by the corporate system.

A more ideal scenario would be that a store could at any time throw a request about product availability into the ether and nearby stores would respond, indicating whether they had it. That’s what I’m going to set up using the Azure Service Bus, as depicted in Figure 1.

An Inventory-Check Message
Figure 1 An Inventory-Check Message

Topics provide a durable mechanism that lets me push content out to up to 2,000 subscribers per topic. That subscriber limitation is unfortunate, as it potentially forces a solution architecture (like the one I’ll describe) to work around it by somehow creating segments in the Topic. For example, instead of a U.S. Inventory Check Topic that subscribers filter by region, I might have to create a SouthCentral U.S. Inventory Check Topic and then further filter to the specific local branch. With that caveat, I’ll proceed with my single Inventory Check Topic, as I can promise I won’t have more than a handful of franchise locations in my wonderland.

 

 

Getting the Message Out

You can download the Azure CTP June Update from bit.ly/p3DhAU, and find the management portal at manage.windowsazure.com/. I’ll access the management portal to retrieve a few pieces of information I need to use in my code (see Figure 2).

Getting Information from the Azure Management Portal
Figure 2 Getting Information from the Azure Management Portal

I need to grab the Service Gateway, the Default Issuer (always “owner” in the CTP) and the Default Key. Because I need these throughout my sample, I’ll create them at object scope:

private string sbNamespace = "jofultz";

private string issuerName = "owner";

private string issuerKey = "25vtsCkUPBtB39RdiBbUtCQglYRPOdHHC4MvX5fvxxxx";

I’ll use these variables in my functions to create the Topic, and then to post messages to the Topic. There’s a REST interface for those who want to use it or who aren’t deploying on a platform that allows use of the client library. Because I’m building a Windows client, I’ll use the library. To do that, I need to add references to the Microsoft.ServiceBus, Microsoft.ServiceBus.Message and System.ServiceModel, as shown in Figure 3.

Adding References
Figure 3 Adding References

Once that’s done, a few straightforward lines of code are all that’s needed to get the Topic set up:

SharedSecretCredential credential =

  TransportClientCredentialBase.CreateSharedSecretCredential(

  issuerName, issuerKey);

 

Uri sbUri = ServiceBusEnvironment.CreateServiceUri(

  "sb", sbNamespace, String.Empty);

ServiceBusNamespaceClient sbNSClient =

  new ServiceBusNamespaceClient(sbUri, credential);

Topic newTopic = sbNSClient.CreateTopic(topicName);

What will become important as I start throwing messages out there is the ability of the stores to filter the messages and get only the ones that are relevant to the receiving location based on region. The API supports correlation and subscription IDs as first-class citizens, but for the filtering I want to use my knowledge of the data and filter based on the contents of the request. Thus, I want geographically close stores to look for and respond to each other’s requests for inventory based on region. At the same time, I need to make sure that the originating store doesn’t pick up its own requests. Requests will include the SKU of the item in question, the region in which the inquiry is made, and the RequesterID so the originating store can filter out its own requests on the topic:

[Serializable]

class InventoryQueryData

{

  public string Region;

  public string ResponderID;

  public string RequesterID;

  public string Sku;

  public int Count;

}

Note the [Serializable] attribute that I’ve added to the class. You could mark the properties as DataMembers and use DataContract as well, but the point is that whatever type I intend to send to the topic needs to be serializable.

I’ve created a simple form that allows me to enter an arbitrary string for the SKU and select from a list for the posting store. The code behind my inquiry button looks similar to the code to connect and create the topic and resembles the typical constructs you find when working with messaging APIs, as shown in Figure 4.

Figure 4 Getting the Data Values

// Assign data values.

InventoryQueryData data = new InventoryQueryData();

data.Sku = txtSKU.Text;

data.StoreID = cboStore.SelectedText;

data.Region = cboStore.SelectedValue.ToString();

 

Uri sbUri = ServiceBusEnvironment.CreateServiceUri("sb", sbNamespace, string.Empty);

SharedSecretCredential credential = TransportClientCredentialBase.CreateSharedSecretCredential(issuerName, issuerKey);

 

MessagingFactory msgFactory = MessagingFactory.Create(sbUri, credential);

TopicClient topicClient = msgFactory.CreateTopicClient(topicName);

MessageSender MsgSender = topicClient.CreateSender();

 

BrokeredMessage msg = BrokeredMessage.CreateMessage(data);

// Add props to message for filtering.

msg.Properties["Region"] = data.Region;

msg.Properties["RequesterID"] = data.RequesterID;

 

msg.TimeToLive = TimeSpan.FromSeconds(60);

 

MsgSender.Send(data);

There are two things to note here. First, I’ve added the name-value pairs I need for filtering to the BrokeredMessage.Properties collection. Second, as a matter of runtime maintenance, I’ve given the TimeToLive (TTL) a value of 60 seconds, which should keep the subscriptions on the topic from getting too backed up. Of course, you’ll generally want a more informed approach for picking the TTL, but I figure if the request doesn’t reach any of the subscribers in that time, it’s probably too long because there’s a customer standing there waiting. Besides, this is just a sample.

Any message that’s sent to the bus is in the form of a BrokeredMessage that has a factory method CreateMessage. This simply wraps the data into an instance of a BrokeredMessage type that contains all of the constructs needed for a fully functional messaging system.

With this I have all I need to get a message out to the subscribers of the Inventory Check Topic, so now I’ll start setting up the subscription clients to fetch the messages and respond.

Tapping the Topic Stream and Responding

With my client in place, I’m ready to send out a steady stream of requests, but right now this will be little more than letting bits loose in the wild ether. I create a Windows Form application and reuse the XML store list and the InventoryQueryData from the first (sender) application. I’ll need to create a unique subscription for each client that’s listening to the topic. This is easily enough accomplished by combining a subscription name with the store number I want to listen for. My little test app allows me to select the store number from a combobox, so I just tack that value onto the base Subscription name in order to create the unique Subscription name. It’s important to ensure that every client has a unique Subscription; two or more using the same subscription would create a race condition where those subscribers would compete for the message and the first one to receive and delete would win:

Topic invTopic = sbNSClient.GetTopic(topicName);

SubscriptionName = "InventoryQuerySubscription" + this.cboStore.Text;

SqlFilterExpression RegionFilter = new SqlFilterExpression("Region = '" +

  cboStore.SelectedValue + "' AND RequesterID <> '" + cboStore.Text + "'");

 

Subscription sub = invTopic.AddSubscription(SubscriptionName, RegionFilter);

As I add the Subscription to the Topic, I can also pass in a filter so that each store receives only the inventory check requests for its own region. Note that you can develop your own FilterExpression type from the base type, but the API includes four types that should cover most scenarios, especially if used together: CorrelationFilterExpression, MatchAllFilterExpression, MatchNoneFilterExpression and SqlFilterExpression. I used SqlFilterExpression, which allowed me to easily and instinctively write this expression to get the messages for the Texas region, for example, and exclude messages that originated from my own store:

"Region = '[Region]' AND RequesterID <> '[StoreID]'"

I only need to filter the requests coming through, but in some cases I could theoretically “fix up” some data on the way in using a RuleDescription to, say, combine a SqlFilterExpression with a SqlFilterAction. The former identifies the messages that I’m targeting, while the latter defines the action that should be taken. This type of functionality can be useful when the data coming through needs to be massaged into something that works for the recipient and both sides of the bus can’t or won’t be changed.

Once the subscription is set up it will persist even after the client closes down. This is perfect for this scenario; I’ll simply create a SubscriptionClient every time I start monitoring, and it will attach to the existing connection. However, not everyone will want that behavior. I can imagine situations where you’d want to remove the subscription when the client shuts down:

SubscriptionClient subClient = msgFactory.CreateSubscriptionClient(

  topicName, SubscriptionName);

MessageReceiver msgReceiver = subClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);

msgReceiver.Open();

Note in the call to CreateReceiver that I’ve set ReceiveMode to ReceiveAndDelete. You could also use PeekLock for the ReceiveMode. Here, I simply want to grab the message and process it, and I have no need to ensure proper receipt and processing prior to deletion because if the message is lost, it isn’t a big deal. If I needed behavior that was more guaranteed and reliable, I’d likely do two things. I wouldn’t set the TTL for the Message, Topic or Subscription, and instead let the messages live indefinitely. Or, I’d give it a very high TTL so that the receiver had sufficient time to process or move to an exception queue so that only truly undeliverable messages would end up in the dead-letter queue. Also, I’d use PeekLock on the receiver to read and process the data and only remove the message upon completion of the related process. Manually creating this distributed transaction behavior can quickly lead to other problems, such as poison queues, but we’ll address that behavior and problem set another time.

Once I’ve opened the receiver I enter a loop to check for messages. The API has a direct Receive method that will return the BrokeredMessage. However, I’ll use the TryReceive method, which will return a bool to indicate success (see Figure 5). I’ll pass a relatively short timeout to that method, which should be long enough to check and receive the message. If a message is received, I’ll work with it and immediately check for another message. If no message is received, I’ll sleep the thread for a bit and check again.

Figure 5 Checking for Messages

while (MonitorOn)

{

  BrokeredMessage NewMsg;

  bool recvSuccess = msgReceiver.TryReceive(TimeSpan.FromSeconds(3), out NewMsg);

 

  if (recvSuccess)

  {

    this.lbMessages.Items.Add(NewMsg.MessageId);

    InventoryQueryData RequestObject =

      NewMsg.GetBody<CreateInquiry.InventoryQueryData>();

    ProcessMessage(RequestObject);

  }

  else

  {

    System.Threading.Thread.Sleep(3000);

  }

}

My request object is contained in the BrokeredMessage, and to retrieve it I’ll call GetBody<T>, passing the object type. The implication here is that object types need to match on both sides of the bus. You can accomplish this using proxy types or, in the case of uncertainty or plain old XML coming across, you could pass the object as a string and handle it as XML.

Until Next Time …

At this point, all the work is done to create the message and get it out to all of the interested nodes, and for them to fetch the message from the Topic. I’ve demonstrated the features that allow me to not only broadcast the message, but also filter appropriately for the subscribed receiver. Next month I’ll work through the return trip to demonstrate the complementary use of queues and correlation to finish the scenario and round out your basic understanding of the new features in the Azure Service Bus. I’ll also have the complete code available for you to download.


Joseph Fultz * is a software architect at Hewlett-Packard Co., working as part of the HP.com Global IT group. Previously he was a software architect for Microsoft working with its top-tier enterprise and ISV customers defining architecture and designing solutions.*

Thanks to the following technical expert for reviewing this article: Jim Keane