SALES: 1-800-867-1380

Using Service Bus Queues, Topics, and Subscriptions with Java

Updated: October 6, 2014

Azure Service Bus queues, topics, and subscriptions can help you to scale out your distributed application to very large numbers of messages and users. Service Bus queues can enable asynchronous processing among competing message consumers, which means your solution balances load more efficiently across periods of higher and lower demand and across components having higher and lower processing speeds. Service Bus Topics and Subscriptions offer publish/subscribe messaging. Messages published by a topic are available to many subscribers. A subscription resembles a virtual queue that receives copies of the messages and you can register rules to filter which messages will be received by each subscription. With the publish/subscribe communication patterns, multiple message consumers can subscribe to a topic without having to receive and process every message. Topics and filtered Subscriptions offer flexible one-to-many messaging to large numbers of message consumers.

You must first have an Azure account and create a service namespace to begin using Service Bus Queues, Topics, and Subscriptions.

To create or manage a Service Bus Queue or Topic in your namespace you will require the default management credentials for your namespace. Your application needs the values of the shared secret key and issuer, which you can obtain from the Azure portal. Your application may use Java code similar to the following example to return a ServiceBusContract object that encapsulates the required credentials. All the examples in this guide use the following Service class to get the management credentials and ServiceBusContract object.

public class Service 
{
   private Configuration config;
   public Service()
   {
      String namespace = "jasper";
      String issuer = "owner";
      String key = "RR9RR9RRRbRRRbRbbRRbR99RR9b+Rb9RbR+tjRRbRR9=";
      String serviceBusRootUri = ".servicebus.windows.net";
      String wrapRootUri = "-sb.accesscontrol.windows.net/WRAPv0.9";
      this.config =
         ServiceBusConfiguration.configureWithWrapAuthentication(
         namespace, issuer, key, serviceBusRootUri, wrapRootUri);
   }
   public ServiceBusContract getservice()
   {
      ServiceBusContract service = ServiceBusService.create(config);
      return service;
   }
}

Service Bus Queues can help you to scale out your distributed application by enabling asynchronous processing and First In, First Out (FIFO) message delivery to competing message consumers. A message producer (sender) can hand off a message to the queue and does not have to wait for a reply before sending additional messages. Any available message consumer (receiver) can pull a message from the queue and process it. Because message consumers delete the message when they pull it from the queue for processing, messages are typically processed in the same order as they were added to the queue and each message is processed by only one message consumer. With a queue, your solution only needs the capacity to handle the average processing load, rather than the peak load. Message consumers can receive messages as they are ready to process them and the load gets balanced across faster and slower consumers automatically. The implementation of a queue can therefore help you scale out your distributed application by enabling your solution to balance load more efficiently across periods of higher and lower demand and across components capable of higher and lower processing speeds.

The following Java code creates a Service Bus Queue named “TestQueue”. There are methods on QueueInfo that enable properties to be specified, such as the “time-to-live” for message in the queue.

public class make_bus_q 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      QueueInfo queueInfo = new QueueInfo("TestQueue");
      service.createQueue(queueInfo);
      System.out.println(queueInfo.getPath());
   }
}

Running the previous code will cause a conflict error if a queue named “TestQueue” already exists in the namespace “jasper”. You can use the listQueues method on ServiceBusContract objects to check if a queue with a specified name already exists within a service namespace. The following example Java code returns a list of the first 100 queues that exist in the namespace. You can scope your list by using the skip and top options. The maximum size list returned by listQueues is 100, so you will need to repeatedly call listQueues to make an inventory of more than 100 queues.

public class List_bus_q 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      ListQueuesOptions options = new ListQueuesOptions();
      options.setTop(100);
      options.setSkip(0);
      ListQueuesResult queuesResult = new ListQueuesResult();
      queuesResult = service.listQueues(options);
      List<QueueInfo> list = queuesResult.getItems();
      System.out.println("List Service Bus Queues from " +
      options.getSkip() + " to " + options.getTop());
      for (QueueInfo info : list)
      {
         System.out.println("Queue name: " + info.getPath());
      }
   }
}

The following code deletes an existing “TestQueue” queue from the namespace.

public class Delete_bus_q 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      service.deleteQueue("TestQueue");
      QueueInfo queueInfo = new QueueInfo("TestQueue");
      System.out.println("Deleted the " + queueInfo.getPath() + " queue.");
   }
}

Although there is no limit to the number of messages that can be held in a queue, there are limits on the size of messages and the total size of the queue. The maximum queue size is defined at creation time and has an upper limit of 5 GB. Each message can have a maximum size of 256 KB. The header, which includes the standard and custom application properties, can have a maximum size of 64 KB.

After “TestQueue” exists, you can run Java code similar to the following to send brokered messages to the queue. BrokeredMessage objects have methods such as getLabel, getTimeToLive, setLabel, and setTimeToLive, a dictionary that can hold custom properties, and a message body containing arbitrary application data. An application can set the body of the message by passing any serializable object into the BrokeredMessage constructor and the appropriate serializer will serialize the object. Alternatively, a java.IO.InputStream can be provided to the constructor. In the following example, the string “TestMessageBodyWorks” represents the message body being passed to the constructor.

public class SendSbMessB 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      for (int i=0; i<7; i++)
      {
         BrokeredMessage message = new BrokeredMessage("TestMessageBodyWorks" + i);
         message.setLabel("Day"+i);
         message.setProperty("CustomProperty", i);
         service.sendQueueMessage("TestQueue", message);
         System.out.println("send message");
         System.out.println(message.toString());
      }
   }
}

For the purpose of example, this guide uses the following Converter class to convert the input stream representing the brokered message body back into a string that can be displayed on the console.

public class Converter
{
   public Converter(){}
   public static String fromStream(InputStream in) throws IOException
   {
      BufferedReader reader = new BufferedReader(new InputStreamReader(in));
      StringBuilder out = new StringBuilder();
      String line;
      while ((line = reader.readLine()) != null) {out.append(line);}
      return out.toString();
   }
}

ServiceBus Queues are First In, First Out (FIFO), and each time a message consumer retrieves a message, they receive the earliest message still remaining in the queue. The following Java code retrieves the earliest message remaining in “TestQueue”. Note that the receive mode is set to the optional PeekLock mode in this example, the default is ReceiveAndDelete mode.

public class GetSbMess
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
      opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
      ReceiveQueueMessageResult resultQM = service.receiveQueueMessage("TestQueue",opts);
      BrokeredMessage message = resultQM.getValue();
      if (message != null && message.getMessageId() != null)
      {
         try
         {
            System.out.println("Label: " + message.getLabel());
            System.out.println("MessageID: " + message.getMessageId());
            System.out.println("Custom Property: " + message.getProperty("CustomProperty"));
            System.out.println("String representation of message object: " + message.toString());
            System.out.println("Body converted to string: " + Converter.fromStream(message.getBody()));
            service.deleteMessage(message);
            System.out.println("Deleted the " + message.getLabel() + " message.");
         }
         catch (Exception ex)
         {
            // Indicate a problem, unlock message in queue
            System.out.println("Inner exception encountered!");
            service.unlockMessage(message);
         }
      }
      else
      {
         System.out.println("There are no more messages.");
         // Added to handle no more messages in the queue.
         // Could instead wait for more messages to be added.
      }
   }   
}

When using the default ReceiveAndDelete mode, Service Bus returns the next message in the queue to the message consumer and removes it from the queue. Because the message consumer may fail to process the message, the RecieveAndDelete mode works best with applications that can tolerate not processing a message in the event of a failure.

When using the optional PeekLock mode, as in the previous example, Service Bus returns the next message in the queue to the message consumer and locks the message to prevent any other consumers from receiving it. After the consumer processes the message it must call deleteMessage on the message to remove it from the queue. If the consumer fails processing the message it can call unlockMessage to unlock the message in the queue for another consumer to process. There is also a timeout associated with a message locked within the queue, and if the application fails to process the message before the lock timeout expires, then Service Bus will unlock the message automatically and make it available to be received again. The PeekLock mode therefore works best with applications that cannot tolerate not processing every message in the queue.

Service Bus Topics and Subscriptions enable a one-to-many pattern of communication that can help you to scale out your distributed application to very large numbers of messages sent to a very large number of users. A message producer can hand off messages to a topic and does not have to wait for a reply before sending additional messages. You can register multiple subscriptions to a topic. Messages that have been published to a topic are available to each subscription to that topic. A topic subscription resembles a virtual queue that receives copies of the messages that have been published to the topic. You can also control which messages published to a topic can be received by that subscription by registering optional filter rules with the subscription. Topics enable the implementation of a publish and subscription communication pattern that can help you scale out your distributed application to a very large numbers of messages and consumers because more than one message consumer can subscribe to a topic and because each message consumer does not have to process every message.

The following Java code creates a topic. There are methods on TopicInfo that enable you to specify properties of the topic. For example, you can set the default "time-to-live" value for messages in the queue.

public class Make_bus_topic 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      TopicInfo topicInfo = new TopicInfo("TestTopic");
      service.createTopic(topicInfo);
      System.out.println(topicInfo.getPath());
   }
}

Running this code will result in a conflict error if a topic named “TestTopic” already exists in the namespace “jasper”. You can use the listTopics method on ServiceBusContract objects to check if a queue with a specified name already exists within a service namespace. The following example Java code returns a list of the first 100 topics that exist in the namespace. You can scope your list by using the skip and top options.

public class List_bus_topics 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      ListTopicsOptions options = new ListTopicsOptions();
      options.setTop(100);
      options.setSkip(0);
      ListTopicsResult topicsResult = new ListTopicsResult();
      topicsResult = service.listTopics(options);
      List<TopicInfo> list = topicsResult.getItems();
      System.out.println("List Service Bus Topics from " + options.getSkip() + " to " + options.getTop());
      for (TopicInfo info : list)
      {
         System.out.println("Topic name: " + info.getPath());
      }
   }
}

The following code deletes an existing “TestTopic” topic from the namespace.

public class Delete_bus_topic 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      service.deleteTopic("TestTopic");
      TopicInfo topicInfo = new TopicInfo("TestTopic");
      System.out.println("Deleted the " + topicInfo.getPath() + " topic.");
   }
}

The following Java code creates a subscription to “TestTopic” named “AllMessages”. This example uses the default MatchAll filter. All the messages that are published to the topic are placed in the “AllMessages” subscription’s virtual queue.

public class Make_bus_sub 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      SubscriptionInfo subInfo = new SubscriptionInfo("AllMessages");
      service.createSubscription("TestTopic", subInfo);
      System.out.println(subInfo.getName());
   }
}

You can register filters for a subscription that specify rules as to which of the messages published to a topic should be added to the virtual queue of a subscription. The SqlFilter filter implements a subset of SQL92 revision of the SQL database language. The following example uses the The SqlFilter filter.

This example adds two filtered subscriptions to the “TestTopic” topic. The subscription named "HighMessages" registers a filter that only passes messages that have a custom MessageNumber property greater than 3. The subscription named "LowMessages” registers a filter that only passes messages that have a MessageNumber property less than or equal to 3. A message producer can publish any number of messages to "TestTopic", however the virtual queue of the "LowMessages" subscription only receives those messages having a custom MessageNumber less than or equal to 3 and the virtual queue of the "HighMessages" subscription only receives messages with MessageNumber greater than 3. The virtual queue of the “AllMessages” subscription still receives all messages.

public class Make_sub_rule
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      
      SubscriptionInfo subInfo =
         new SubscriptionInfo("LowMessages");
      CreateSubscriptionResult result =
         service.createSubscription("TestTopic", subInfo);
      RuleInfo ruleInfo =
         new RuleInfo("RULENAME");
      ruleInfo = ruleInfo.withSqlExpressionFilter("MessageNumber <= 3");
      CreateRuleResult ruleResult =
         service.createRule("TestTopic", "LowMessages", ruleInfo);
      service.deleteRule("TestTopic", "LowMessages", "$Default");
      
      SubscriptionInfo subInfo2 =
         new SubscriptionInfo("HighMessages");
      CreateSubscriptionResult result2 =
         service.createSubscription("TestTopic", subInfo2);
      RuleInfo ruleInfo2 =
         new RuleInfo("RULENAME2");
      ruleInfo2 = ruleInfo2.withSqlExpressionFilter("MessageNumber > 3");
      CreateRuleResult ruleResult2 =
         service.createRule("TestTopic", "HighMessages", ruleInfo2);
      service.deleteRule("TestTopic", "HighMessages", "$Default");
      
      System.out.println(subInfo.getName());
      System.out.println(result.toString());
      System.out.println(ruleInfo.getName());
      System.out.println(ruleResult.toString());
      System.out.println(subInfo2.getName());
      System.out.println(result2.toString());
      System.out.println(ruleInfo2.getName());
      System.out.println(ruleResult2.toString());
   }
}

There are now three subscriptions to “TestTopic”: “AllMessages”, “LowMessages”, and “HighMessages”.The following Java code generates some messages that increments the value of the “MessageNumber” property and publishes them to the “TestTopic” topic

public class SendSbMsTopicB
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      for (int i=0; i<7; i++)
      {
         BrokeredMessage message =
            new BrokeredMessage("Test message" + i);
         message.setLabel("Day" + i);
         message.setProperty("MessageNumber", i);
         message.setProperty("CustomProperty", "CustomTestValue" + i);
         service.sendTopicMessage("TestTopic", message);
         System.out.println("send message to topic");
         System.out.println(message.toString());
      }
   }
}

The virtual queue for the “AllMessages” subscription receives all of the messages published to “TestTopic”. The virtual queue for the “LowMessages” subscription receives only messages with a MessageNumber value less than or equal to 3. The virtual queue for the “HighMessages” subscription receives only messages with a MessageNumber value greater than 3. You can demonstrate this by running the following Java code alternatively for a subscriptionName of “AllMessages”, “LowMessages”, and “HighMessages”.

The following example processes the next message in the virtual queue of the specified subscription. After processing the message is removed from the virtual queue.

public class GetSbMessSub 
{
   public static void main(String[] args) throws ServiceException
   {
      Service creds = new Service();
      ServiceBusContract service = creds.getservice();
      
      String subscriptionName = "AllMessages";
      ReceiveMessageOptions opts =
         ReceiveMessageOptions.DEFAULT;
      opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
      ReceiveSubscriptionMessageResult resultQM =
         service.receiveSubscriptionMessage("TestTopic", subscriptionName, opts);
      BrokeredMessage message = resultQM.getValue();
      
      if (message != null && message.getMessageId() != null)
      {
         try
         {
            System.out.println("Subscription: " + subscriptionName);
            System.out.println("Label: " + message.getLabel());
            System.out.println("MessageID: " + message.getMessageId());
            System.out.println("MessageNumber: " + message.getProperty("MessageNumber"));
            System.out.println("Custom Property: " + message.getProperty("CustomProperty"));
            System.out.println("String representation of message object: " + message.toString());
            System.out.println("Body converted to string: " + Converter.fromStream(message.getBody()));
            service.deleteMessage(message);
            System.out.println("Deleted the " + message.getLabel() +
               " message from the " + subscriptionName + " subscription.");
         }
         catch (Exception ex)
         {
            // Indicate a problem, unlock message in queue
            System.out.println("Inner exception encountered!");
            service.unlockMessage(message);
         }
      }
      else
      {
         System.out.println("There are no more messages.");
         // Added to handle no more messages in the queue.
         // Could instead wait for more messages to be added.
      }
   }   
}

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback
Show:
© 2015 Microsoft