Dışarıya aktar (0) Yazdır
Tümünü Genişlet
EN
Bu içerik dilinizde bulunmamaktadır ancak İngilizce sürümüne buradan bakabilirsiniz.

How to integrate a WCF Workflow Service with Service Bus Queues and Topics

Updated: March 13, 2014

Author: Paolo Salvatori

Reviewers: Ralph Squillace, Sidney Higa

This document provides an introduction to Service Bus Brokered Messaging and guidance on how to integrate a WCF workflow service with Service Bus Queues and Topics. The document will walk you through how to build a WCF workflow service that uses Queues and Topics to exchange messages with a client application. As a companion to this guide, I strongly recommend the excellent article Using Azure Service Bus Messaging by Roman Kiss which provides a more sophisticated solution to the same problem, and a set of reusable activities that you can use in a WCF workflow service to interact with Service Bus queues and topics.

For more information on the Azure Service Bus, see the following resources:

Queues

Queues provide messaging capabilities that enable a large and heterogeneous class of applications running on premises or in the cloud to exchange messages in a flexible, secure and reliable fashion across network and trust boundaries.

ServiceBus-Queues-Topics-WCF-Workflow-Service1

Queues are hosted in Azure by a replicated, durable store infrastructure. The maximum size of a queue is 5GB. The maximum message size is 256KB, but you can use sessions to create unlimited-size sequences of related messages. Queues are accessed through the following APIs:

Queue entities provide the following capabilities:

  • Session-based correlation, meaning that you can build multiplexed request/reply paths. In a competing consumer scenario where multiple worker processes receive messages from the same session-enabled queue or subscription guarantees that messages sharing the same SessionId are received by the same consumer.

  • The ability to specify a time at which the message will be added to the queue.

  • Reliable delivery patterns using the PeekLock receive mode (a message stays on the queue until it is definitively processed).

  • Support for transactions, to ensure batches of messaging operations) are committed atomically. This feature allows an application to send multiple messages in the context of the same transaction and hence to commit or abort the entire operation as single unit of work.

  • Detection of inbound message duplicates, allowing clients to send the same message multiple times without adverse consequences.

  • A dead-letter facility for messages that cannot be processed or expire before being received.

  • Deferral of messages for later processing. (This functionality is particularly handy when messages are received out of the expected sequence and need to be safely put on the side while the process waits for a particular message to permit further progress or when messages need to be processed based on a set of properties that define their priority during a traffic spike.)

The two most important types in the .NET API for brokered messages – those messages that are used in Service Bus queues, topics, and their subscriptions – are represented are the BrokeredMessage class (which exposes properties such as MessageId, SessionID, and CorrelationId that enable automatic duplicate detection and session-enabled communications, among other things) and the QueueDescription class, which you can use to control the behavior of the queue being created. The QueueDescription class has the following important properties:

noteNote
Since metadata cannot be changed once a messaging entity is created, modifying the duplicate detection behavior requires deleting and recreating the queue. The same principle applies to any other metadata.

Architecturally, using queues is an important part of distributed applications because it permits you to flatten highly-variable traffic into a predictable stream of work and then distribute the load across a set of worker processes the size of which can vary dynamically to accommodate the incoming message volume. In a Competing Consumers scenario, when a publisher writes a message to a queue, multiple consumers compete with each other to receive the message, but only one of them will receive and process the message in question. In other words, a queue can have a single consumer that receives all messages, or a set of competing consumers that fetch messages on a first-come-first-served basis. For this reason, queues are an excellent messaging solution to distribute the work load across multiple sets of competing worker processes.

In service-oriented or service bus architectures composed of multiple, heterogeneous systems, interactions between autonomous systems are asynchronous and loosely-coupled. In this context, people often think of SOAP or REST services to keep components loosely coupled, but the use of Service Bus messaging entities like Queues and Topics (see the next section) increases the agility, scalability and flexibility of the overall architecture and helps decreasing the loose coupling of individual systems.

For more information on Queues, see the following articles:

Topics

Topics extend the messaging features provided by Queues with the addition of Publish-Subscribe capabilities.

ServiceBus-Queues-Topics-WCF-Workflow-Service2

A Topic entity consists of a sequential message store like a queue, but it supports up to 2000 concurrent and durable subscriptions which relay copies of the message to a set of worker processes (this number is subject to vary in the future). As depicted in the following figure, each Subscription can define one or multiple Rule entities.

ServiceBus-Queues-Topics-WCF-Workflow-Service3

Each Rule specifies a filter expression that is used to filter messages that pass through the subscription and a filter action that can modify message properties. In particular, the SqlFilter class allows you to define a SQL92-like condition on message properties:

  • OrderTotal > 5000 OR ClientPriority > 2

  • ShipDestinationCountry = ‘USA’ AND ShipDestinationState = ‘WA’

For more information on this topic, review the documentation available of the SqlExpression property.

Conversely, the SqlRuleAction class can be used to modify, add or remove properties to a BrokeredMessage object using syntax similar to that used by the SET clause of an UPDATE SQL-command.

  • SET AuditRequired = 1

  • SET Priority = 'High', Severity = 1

noteNote
Each matching rule that explicitly defines an action generates a separate copy of the published message, so any subscription can potentially generate more copies of the same message, one for each matching rule.

Like Queues, Topics also support a Competing Consumers scenario. In this context, a subscription can have a single consumer that receives all messages, or a set of competing consumers that fetch messages on a first-come-first-served basis. Topics are the excellent messaging solution to broadcast messages to many consumer applications or distribute the work load across multiple sets of competing worker processes.

For more information on Topics, see the following articles:

The BrokeredMessage Class

The BrokeredMessage class models the messages exchanged by applications that communicates using Queues and Topics. The class provides 4 different public constructors:

The class exposes an interesting set of methods that allow executing a wide range of actions at the message level:

  • When using the PeekLock receive mode, the Abandon method allows to release the lock on a peek-locked message, whereas the Complete method commits the receive operation of a message and indicates that the message should be marked as processed and deleted or archived.

  • The Defer method indicates that the receiver wants to defer the processing for this message. As mentioned before, deferring messages is a convenient way to handle those situations where messages are received out of the expected sequence and need to be safely parked while the applications waits for a particular message before proceeding with the processing of the message flow.

  • The DeadLetter and DeadLetter(String, String) methods allows an application to explicitly move a message the dead-letter queue of a queue or a subscription. Take into account that when you create a Queue entity using the managing API or the Azure Management Portal, you can configure it to automatically move expired messages to the dead-letter queue. In the same way, you can configure a Subscription to move expired messages and messages that fail filter evaluation to its dead-letter queue.

The BrokeredMessage class exposes a wide range of properties:

  • The ContentType property allows you to specify the type of the content.

  • The MessageId is the identifier of the message.

  • The CorrelationId property can be used to implement a request-reply message exchange pattern where the client application uses the MessageId property of an outgoing request message and the CorrelationId property of an incoming response message to correlate the two messages. (We’ll see an implementation of this technique ahead in this article.)

  • The SessionId property allows to set or get the identifier of the session for a message. In a competing consumer scenario where multiple worker processes receive messages from the same session-enabled queue or subscription, messages sharing the same SessionId are guaranteed to be received by the same consumer. In this context, when a client application A sends a flow of request messages to a server application B via a session-enabled queue or topic and waits for the correlated response messages on a separate session-enabled queue or a subscription, the client application A can assign the id of the receive session to the ReplyToSessionId property of outgoing messages to indicate to the application B the value to assign to the SessionId property of response messages.

  • The ReplyTo property gets or sets the address of the queue to reply to. In an asynchronous request-reply scenario where a client application A sends a request message to a server application B via a Service Bus queue or topic and waits for a response message, by convention the client application A can use the ReplyTo property of the request message to indicate to the server application B the address of the queue or topic where to send the response. (We’ll see an application of this technique later in this paper.) The Label property gets or sets the application specific label for custom needs

  • The SequenceNumber property returns the unique number assigned to a message by the Service Bus. This property can be used to retrieve a deferred message from a queue or a subscription.

  • The TimeToLive property permits to define or review the message’s time-to-live value. The Service Bus does not enforce a maximum lifetime for messages waiting to be processed in a queue or a subscription. Nevertheless, you can define a default time to live for messages when you create a queue, a topic or a subscription or you can explicitly define an expiration timeout at a message level using the TimeToLive property.

  • The DeliveryCount returns the number of message deliveries.

  • The Properties collection allows defining application specific message properties. This is probably the most important feature of a BrokeredMessage entity as user-defined properties can be used for the following:

    • Carry the payload of a message. In this context, the body of the message could be empty.

    • Define application specific properties that can be used by a worker process to decide how to process the current message.

    • Specify filter and action expressions that can be used to define routing and data enrichment rules at a subscription level.

If you are familiar with context properties on a BizTalk message, it is helpful to think of the user-defined properties contained in the BrokeredMessage Properties collection as the context properties of BizTalk message. Another example of a property bag used to transmit context information is represented by WCF that provides a special set of context–enabled bindings, like the BasicHttpContextBinding, NetTcpContextBinding or WSHttpContextBinding, that allow to send extra parameters to the service to exchange context by using the HttpCookies or the SOAP Header. In fact, the BrokeredMessage Properties collection can be used to carry a piece of information or even the entire message payload and, when using Topics and Subscriptions, they can be used to route the message to the proper destination. Hence, in a scenario where a third party system exchanges messages with a BizTalk application via Service Bus, it’s of great importance to translate the application specific properties carried by a BrokeredMessage object into context properties of BizTalk message and vice versa. In the article and in the companion code I’ll show you how to achieve this result.

noteNote
As indicated in Azure AppFabric Service Bus Quotas, the maximum size for each property is 32K. Cumulative size of all properties cannot exceed 64K. This applies to the entire header of the BrokeredMessage, which has both user properties as well as system properties (such as SequenceNumber, Label, MessageId, and so on). The space occupied by properties counts towards the overall size of the message and its maximum size of 256K. If an application exceeds any of the limits mentioned above, a SerializationException exception is generated, so you should expect to handle this error condition.

NetMessagingBinding

The Service Bus Brokered Messaging supports the WCF programming model and in particular provides a new binding called NetMessagingBinding that can be used by WCF-enabled applications to send and receive messages via queues, topics and subscriptions over the Service Bus Messaging Protocol (SBMP). NetMessagingBinding is the new name for the binding for Queues and Topics that is providing full integration with WCF. From a functional perspective, the NetMessagingBinding is similar to the NetMsmqBinding which provides support for queuing by using Message Queuing (MSMQ) as a transport and enables support for loosely-coupled applications. On the service-side, the NetMessagingBinding provides an automatic message-pump that pulls messages off a Queue or Subscription and it’s integrated with WCF’s ReceiveContext mechanism.

The new binding supports IInputChannel, IOutputChannel, IInputSessionChannel standard interfaces. When an application uses WCF and the NetMessagingBinding to send a message to a queue or a topic, the message is wrapped into a soap envelope and encoded. To set BrokeredMessage specific properties, you need to create a BrokeredMessageProperty object, set the properties on it and add it to the Properties collection of the WCF Message, as shown in the following table. When using the NetMessagingBinding to write a message to queue or a topic, the ServiceBusOutputChannel internal class looks for the BrokeredMessageProperty property in the Properties collection of the WCF message and copies all its properties to the BrokeredMessage object it creates. Then it copies the payload from the WCF message to the BrokeredMessage object and finally publishes the resulting message to the target queue or topic.

static void Main(string[] args)
{
    try
    {
        // Create the 
        var channelFactory = new ChannelFactory<IOrderService>("orderEndpoint");
        var clientChannel = channelFactory.CreateChannel();
        
        // Create a order object
        var order = new Order()
                        {
                            ItemId = "001",
                            Quantity = 10
                        };

        // Use the OperationContextScope to create a block within which to access the current OperationScope
        using (var scope = new OperationContextScope((IContextChannel)clientChannel))
        {
            // Create a new BrokeredMessageProperty object
            var property = new BrokeredMessageProperty();

            // Use the BrokeredMessageProperty object to set the BrokeredMessage properties
            property.Label = "OrderItem";
            property.MessageId = Guid.NewGuid().ToString();
            property.ReplyTo = "sb://acme.servicebus.windows.net/invoicequeue";

            // Use the BrokeredMessageProperty object to define application-specific properties
            property.Properties.Add("ShipCountry", "Italy");
            property.Properties.Add("ShipCity", "Milan");

            // Add BrokeredMessageProperty to the OutgoingMessageProperties bag provided 
            // by the current Operation Context 
            OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, property);
            clientChannel.SendOrder(order);
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Likewise, when using a NetMessagingBinding-based service endpoint to receive messages from a queue or a topic, an application can retrieve the BrokeredMessageProperty object from the Properties collection of the incoming WCF Message, as shown in the following table. In particular, upon message receive, the ServiceBusInputChannel and ServiceBusInputSessionChannel internal classes (the latter is used to receive messages from sessionful queues and subscriptions) create a new WCF Message and copy the payload from the body on the inbound BrokeredMessage to the body of the newly created WCF message. Then, they copy the properties from the inbound BrokeredMessage to a new instance of the BrokeredMessageProperty class and finally add the latter to the Properties collection of the incoming WCF Message.

[ServiceBehavior]
public class OrderService : IOrderService
{
    [OperationBehavior]
    public void ReceiveOrder(Order order)
    {
        // Get the BrokeredMessageProperty from the current OperationContext
        var incomingProperties = OperationContext.Current.IncomingMessageProperties;
        var property = incomingProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;

        ...
    }
}

Because the Service Bus does not support IOutputSessionChannel, all applications sending messages to session-enabled queues must use a service contract whose SessionMode property is different from SessionMode.Required. However, the Service Bus WCF runtime supports IInputSessionChannel and so in order to receive messages from a sessionful queue or subscription using WCF and NetMessagingBinding, an application needs to implement a session-aware service contract. The following code snippet provides an example of a WCF service receiving messages from a sessionful queue/subscription.

// ServiceBus does not support IOutputSessionChannel.
// All senders sending messages to sessionful queue must use a contract which does not enforce SessionMode.Required.
// Sessionful messages are sent by setting the SessionId property of the BrokeredMessageProperty object.
[ServiceContract]
public interface IOrderService
{
    [OperationContract(IsOneWay = true)]
    [ReceiveContextEnabled(ManualControl = true)]
    void ReceiveOrder(Order order);
}

// ServiceBus supports both IInputChannel and IInputSessionChannel. 
// A sessionful service listening to a sessionful queue must have SessionMode.Required in its contract.
[ServiceContract(SessionMode = SessionMode.Required)]
public interface IOrderServiceSessionful : IOrderService
{
}

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession, ConcurrencyMode = ConcurrencyMode.Single)]
public class OrderService : IOrderServiceSessionful
{
    [OperationBehavior]
    public void ReceiveOrder(Order order)
    {
        // Get the BrokeredMessageProperty from the current OperationContext
        var incomingProperties = OperationContext.Current.IncomingMessageProperties;
        var property = incomingProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;

        ...

        //Complete the Message
        ReceiveContext receiveContext;
        if (ReceiveContext.TryGet(incomingProperties, out receiveContext))
        {
            receiveContext.Complete(TimeSpan.FromSeconds(10.0d));
            ...
        }
        else
        {
            throw new InvalidOperationException("...");
        }
    }
}

Note that the ManualControl property of the ReceiveContextEnabled operation attribute is set to true. This requires the service to explicitly invoke the ReceiveContext.Complete method to commit the receive operation. In fact, when the ManualControl property is set to true, the message received from the channel is delivered to the service operation with a lock for the message. It is the responsibility of the service implementation to either to call Complete(TimeSpan) or Abandon(TimeSpan) to signal the receive completion of the message. Failure to call either of these results in the lock being held on the message until the lock timeout interval elapses. Once the lock is released (either through calling Abandon(TimeSpan) or lock timeout) the message is re-dispatched from the channel to the service. Calling Complete(TimeSpan) marks the message as successfully received.

Note also that the OrderService class has the ServiceBehavior.InstanceContextMode property set to InstanceContextMode.PerSession and the ConcurrencyMode property set to ConcurrencyMode.Single. This way the ServiceHost will create a new service instance every time a new session is available in the referenced queue/subscription and will use a single thread to receive messages from it in a sequential order. The life-time of the service instance is controlled by setting the SessionIdleTimeout property of the NetMessagingBinding.

Architecture

The following figure shows the high-level architecture of the demo:

ServiceBus-Queues-Topics-WCF-Workflow-Service4

Message Flow:

  1. A client application uses a WCF proxy and the NetMessagingBinding to send a request message to the requestqueue or to the requesttopic.

  2. The WCF workflow service running in a console application or IIS receives the request message from the requestqueue or the ItalyMilan subscription defined on the requesttopic.

    ServiceBus-Queues-Topics-WCF-Workflow-Service5
  3. The WCF workflow service, shown in the figure above, performs the following actions:

    • The custom BrokeredMessagePropertyActivity (identified by the Get BrokeredMessage display name) reads the BrokeredMessageProperty from the inbound message and assigns its value to a workflow variable defined in the outermost the Sequential activity.

    • The Receive activity retrieves the message from the requestqueue or the ItalyMilan subscription of the requesttopic.

    • The custom CalculatorActivity receives the inbound message and BrokeredMessageProperty as input arguments, processes the request message and generates a response message and an outbound BrokeredMessageProperty. When running in a console application, the activity traces the properties of the inbound and outbound BrokeredMessageProperty in the standard output.

    • The If activity reads the address contained in the ReplyTo property of the inbound BrokeredMessageProperty.

      • If the string contains the word “topic”, the response is sent to the responsetopic.

      • Otherwise, the response is sent to the responsequeue.

    • In both cases, an instance of the BrokeredMessagePropertyActivity (identified by the Set BrokeredMessage display name) is used to wrap the Send activity and assign the outbound BrokeredMessageProperty to the properties collection of the WCF response message.

  4. The WCF workflow service writes the reply message to the responsequeue or responsetopic.

  5. The client application uses a WCF service with two distinct endpoints to retrieve the reply message from the responsequeue or from the responsetopic. In an environment with multiple client applications, each of them should use a separate queue or subscription to receive response messages from BizTalk. More on this later on in the article.

Solution

Now that we described the overall architecture of the demo we can analyze in detail the main components of the solution.

Queues, Topics and Subscriptions

The first operation to perform to properly configure the environment is creating the messaging entities used by the demo. The first operation to execute is to provision a new Service Bus namespace or modify an existing namespace to include the Service Bus. You can accomplish this task from the Azure Management Portal respectively by clicking the New button or the Modify button .

The next step is to create the queue, topic and subscription entities required by the demo. As I mentioned in the introduction, you have many options to accomplish this task. The easiest way is by using the Azure Management Portal using the buttons in the Manage Entities command.

You can use the navigation tree-view shown at point 2 to select an existing entity and display its properties in the vertical bar highlighted at point 3. To remove the selected entity, you press the Delete button in the Manage Entities command bar.

noteNote
Using Azure Management Portal is a handy and convenient manner to handle the messaging entities in a given Service Bus namespace. However, at least at the moment, the set of operations that a developer or a system administrator can perform using its user interface is quite limited. For example, the Azure Management Portal actually allows a user to create queues, topics, and subscriptions and define their properties, but not to create or display rules for an existing subscription. At the moment, you can accomplish this task only by using the .NET Messaging API. In particular, to add a new rule to an existing subscription you can use the AddRule(String, Filter) or the AddRule(RuleDescription) methods exposed by the SubscriptionClient class, while to enumerate the rules of an existing subscription, you can use the GetRules method of the NamespaceManager class. Besides, the Azure Management Portal actually does not provide the ability to perform the following operations:

  1. Properly visualize entities in a hierarchical manner. Actually, the Azure Management Portal displays queues, topics and subscriptions with a flat treeview. However, you can organize messaging entities in a hierarchical structure simply by specifying their name as an absolute path composed of multiple path segments, for example crm/prod/queues/orderqueue.

  2. Export the messaging entities contained in a given Service Bus namespace to an XML binding file (a-la BizTalk Server). Instead, the Service Bus Explorer tool provides the ability to select and export

    1. Individual entities.

    2. Entities by type (queues or topics).

    3. Entities contained in a certain path (e.g. crm/prod/queues).

    4. All the entities in a given namespace.

  3. Import queues, topics and subscriptions from an XML file to an existing namespace. The Service Bus Explorer supports the ability to export entities to an XML file and reimport them on the same or another Service Bus namespace. This feature comes in a handy to perform a backup and restore of a namespace or simply to transfer queues and topics from a testing to a production namespace.

The Service Bus Explorer allows a user to create, delete and test queues, topics, subscriptions, and rules and represents the perfect companion for the official Azure Management Portal.

For your convenience, I created a console application called Provisioning that uses the functionality provided by the NamespaceManager class to create the queues, topics, and subscriptions required by the solution. When it starts, the console applications prompt for service namespace credentials. These are used to authenticate with the Access Control service, and acquire an access token that proves to the Service Bus infrastructure that the application is authorized to provision new messaging entities. Then the application prompts for the value to assign to the properties of the entities to create such as EnabledBatchedOperations and EnableDeadLetteringOnMessageExpiration for queues. The Provisioning application creates the following entities in the specified Service Bus namespace:

  1. A queue called requestqueue used by the client application to send request messages to BizTalk Server.

  2. A queue called responsequeue used by BizTalk Server to send response messages to the client application.

  3. A topic called requesttopic used by the client application to send request messages to BizTalk Server.

  4. A topic called responsetopic used by BizTalk Server to send response messages to the client application.

  5. A subscription called ItalyMilan for the requesttopic. The latter is used by BizTalk Server to receive request messages from the requesttopic. The subscription in question has a single rule defined as follows:

    1. Filter: Country='Italy' and City='Milan'

    2. Action: Set Area='Western Europe'

  6. A subscription called ItalyMilan for the responsetopic. The latter is used by client application to receive response messages from the responsetopic. The subscription in question has a single rule defined as follows:

    1. Filter: Country='Italy' and City='Milan'

    2. Action: Set Area='Western Europe'

The following figure displays the output of the Provisioning console application.

ServiceBus-Queues-Topics-WCF-Workflow-Service8

The following table contains the code of the Provisioning application:

#region Using Directives
using System;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Provisioning
{
    static class Program
    {
        #region Private Constants
        //***************************
        // Constants
        //***************************

        private const string RequestQueue = "requestqueue";
        private const string ResponseQueue = "responsequeue";
        private const string RequestTopic = "requesttopic";
        private const string ResponseTopic = "responsetopic";
        private const string RequestSubscription = "ItalyMilan";
        private const string ResponseSubscription = "ItalyMilan";
        #endregion
        static void Main()
        {
            var defaultColor = Console.ForegroundColor;

            try
            {
                // Set Window Size
                Console.WindowWidth = 100;
                Console.BufferWidth = 100;
                Console.WindowHeight = 48;
                Console.BufferHeight = 48;

                // Print Header            
                Console.WriteLine("Read Credentials:");
                Console.WriteLine("-----------------");

                // Read Service Bus Namespace
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Namespace: ");
                Console.ForegroundColor = defaultColor;
                var serviceNamespace = Console.ReadLine();

                // Read Service Bus Issuer Name
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Issuer Name: ");
                Console.ForegroundColor = defaultColor;
                var issuerName = Console.ReadLine();

                // Read Service Bus Issuer Secret
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Issuer Secret: ");
                Console.ForegroundColor = defaultColor;
                var issuerSecret = Console.ReadLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Queues Properties:");
                Console.WriteLine("------------------------");

                // Read Queue EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                var key = Console.ReadKey().KeyChar;
                var queueEnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue EnableDeadLetteringOnMessageExpiration
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnMessageExpiration ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueEnableDeadLetteringOnMessageExpiration = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue RequiresDuplicateDetection
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresDuplicateDetection ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueRequiresDuplicateDetection = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue RequiresSession
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresSession ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueRequiresSession = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Topic Properties:");
                Console.WriteLine("-----------------------");

                // Read Topic EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Topics: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var topicEnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Topic RequiresDuplicateDetection
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Topics: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresDuplicateDetection ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var topicRequiresDuplicateDetection = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Subscriptions Properties: ");
                Console.WriteLine("-------------------------------");

                // Read Subscription EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription EnableDeadLetteringOnFilterEvaluationExceptions
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnFilterEvaluationExceptions ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionEnableDeadLetteringOnFilterEvaluationExceptions = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription EnableDeadLetteringOnMessageExpiration
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnMessageExpiration ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionEnableDeadLetteringOnMessageExpiration = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription RequiresSession
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresSession ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionRequiresSession = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Get ServiceBusNamespaceClient for management operations
                var managementUri = 
                    ServiceBusEnvironment.CreateServiceUri("https", serviceNamespace, string.Empty);
                var tokenProvider = 
                    TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerSecret);
                var namespaceManager = new NamespaceManager(managementUri, tokenProvider);

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Queues:");
                Console.WriteLine("--------------");

                // Create RequestQueue
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue...");
                if (namespaceManager.QueueExists(RequestQueue))
                {
                    namespaceManager.DeleteQueue(RequestQueue);
                }
                namespaceManager.CreateQueue(new QueueDescription(RequestQueue)
                                {
                                    EnableBatchedOperations = queueEnabledBatchedOperations,
                                    EnableDeadLetteringOnMessageExpiration =
                                        queueEnableDeadLetteringOnMessageExpiration,
                                    RequiresDuplicateDetection = queueRequiresDuplicateDetection,
                                    RequiresSession = queueRequiresSession
                                });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue successfully created.");

                // Create ResponseQueue
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue...");
                if (namespaceManager.QueueExists(ResponseQueue))
                {
                    namespaceManager.DeleteQueue(ResponseQueue);
                }
                namespaceManager.CreateQueue(new QueueDescription(ResponseQueue)
                            {
                                EnableBatchedOperations = queueEnabledBatchedOperations,
                                EnableDeadLetteringOnMessageExpiration = 
                                    queueEnableDeadLetteringOnMessageExpiration,
                                RequiresDuplicateDetection = queueRequiresDuplicateDetection,
                                RequiresSession = queueRequiresSession
                            });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue successfully created.");

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Topics:");
                Console.WriteLine("--------------");

                // Create RequestTopic
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                if (namespaceManager.TopicExists(RequestTopic))
                {
                    namespaceManager.DeleteTopic(RequestTopic);
                }
                namespaceManager.CreateTopic(new TopicDescription(RequestTopic)
                                        {
                                            EnableBatchedOperations = topicEnabledBatchedOperations,
                                            RequiresDuplicateDetection = topicRequiresDuplicateDetection
                                        });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Create ResponseTopic
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                if (namespaceManager.TopicExists(ResponseTopic))
                {
                    namespaceManager.DeleteTopic(ResponseTopic);
                }
                namespaceManager.CreateTopic(new TopicDescription(ResponseTopic)
                                    {
                                        EnableBatchedOperations = topicEnabledBatchedOperations,
                                        RequiresDuplicateDetection = topicRequiresDuplicateDetection
                                    });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Subscriptions:");
                Console.WriteLine("--------------");

                // Create Request Subscription
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                var ruleDescription = new RuleDescription(new SqlFilter("Country='Italy' and City='Milan'"))
                                          {
                                              Name = "$Default",
                                              Action = new SqlRuleAction("Set Area='Western Europe'")
                                          };
                var subscriptionDescription = new SubscriptionDescription(RequestTopic, RequestSubscription)
                {
                    EnableBatchedOperations = subscriptionnabledBatchedOperations,
                    EnableDeadLetteringOnFilterEvaluationExceptions = 
                            subscriptionEnableDeadLetteringOnFilterEvaluationExceptions,
                    EnableDeadLetteringOnMessageExpiration = 
                            subscriptionEnableDeadLetteringOnMessageExpiration,
                    RequiresSession = subscriptionRequiresSession
                };
                namespaceManager.CreateSubscription(subscriptionDescription, ruleDescription);
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Create Response Subscription
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                ruleDescription = new RuleDescription(new SqlFilter("Country='Italy' and City='Milan'"))
                                      {
                                          Action = new SqlRuleAction("Set Area='Western Europe'")
                                      };
                subscriptionDescription = new SubscriptionDescription(ResponseTopic, ResponseSubscription)
                {
                    EnableBatchedOperations = subscriptionnabledBatchedOperations,
                    EnableDeadLetteringOnFilterEvaluationExceptions = 
                            subscriptionEnableDeadLetteringOnFilterEvaluationExceptions,
                    EnableDeadLetteringOnMessageExpiration = 
                            subscriptionEnableDeadLetteringOnMessageExpiration,
                    RequiresSession = subscriptionRequiresSession
                };
                namespaceManager.CreateSubscription(subscriptionDescription, ruleDescription);
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");
                Console.WriteLine();

                // Close the application
                Console.WriteLine("Press any key to continue ...");
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("Exception: ");
                Console.ForegroundColor = defaultColor;
                Console.Write(ex.Message);
            }
        }
    }
}

noteNote
I didn't test all the possible combinations of properties for queues and topics so the demo may not work as expected with all the configurations.

Data Contracts

I started defining the Data Contracts for the request and response messages. The Data Contracts provide a mechanism to map .NET CLR types that are defined in code and XML Schemas (XSD) defined by the W3C organization (www.w3c.org). Data contracts are published in the service’s metadata, allowing clients to convert the neutral, technology-agnostic representation of the data types to their native representations. For more information on Data and Contracts, you can read the following articles:

In my solution, I created a project called DataContracts to define the classes that represent, respectively, the request and response messages. For your convenience, I included below their code.

CalculatorRequest Class

[Serializable]
[XmlType(TypeName = "CalculatorRequest", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[XmlRoot(ElementName = "CalculatorRequest", 
         Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
         IsNullable = false)]
[DataContract(Name = "CalculatorRequest", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class CalculatorRequest
{
    #region Private Fields
    private OperationList operationList;
    #endregion

    #region Public Constructors
    public CalculatorRequest()
    {
        operationList = new OperationList();
    }

    public CalculatorRequest(OperationList operationList)
    {
        this.operationList = operationList;
    }
    #endregion

    #region Public Properties
    [XmlArrayItem("Operation", Type=typeof(Operation), IsNullable = false)]
    [DataMember(Order = 1)]
    public OperationList Operations
    {
        get
        {
            return operationList;
        }
        set
        {
            operationList = value;
        }
    } 
    #endregion
}

[CollectionDataContract(Name = "OperationList", 
                        Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
                        ItemName = "Operation")]
public class OperationList : List<Operation>
{
}

[Serializable]
[XmlType(TypeName = "Operation", 
         AnonymousType = true, 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[DataContract(Name = "Operation", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class Operation
{
    #region Private Fields
    private string op;
    private double operand1;
    private double operand2;
    #endregion

    #region Public Constructors
    public Operation()
    {
    }

    public Operation(string op,
                        double operand1,
                        double operand2)
    {
        this.op = op;
        this.operand1 = operand1;
        this.operand2 = operand2;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public string Operator
    {
        get
        {
            return op;
        }
        set
        {
            op = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 2)]
    public double Operand1
    {
        get
        {
            return operand1;
        }
        set
        {
            operand1 = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 3)]
    public double Operand2
    {
        get
        {
            return operand2;
        }
        set
        {
            operand2 = value;
        }
    } 
    #endregion
}

CalculatorResponse Class

[Serializable]
[XmlType(TypeName = "CalculatorResponse", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[XmlRoot(ElementName = "CalculatorResponse", 
         Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
         IsNullable = false)]
[DataContract(Name = "CalculatorResponse", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class CalculatorResponse
{
    #region Private Fields
    private string status;
    private ResultList resultList;
    #endregion

    #region Public Constructors
    public CalculatorResponse()
    {
        status = default(string);
        resultList = new ResultList();
    }

    public CalculatorResponse(string status)
    {
        this.status = status;
        resultList = new ResultList();
    }

    public CalculatorResponse(string status, ResultList resultList)
    {
        this.status = status;
        this.resultList = resultList;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public string Status 
    {
        get 
        {
            return status;
        }
        set 
        {
            status = value;
        }
    }

    [XmlArrayItem("Result", Type=typeof(Result), IsNullable=false)]
    [DataMember(Order = 2)]
    public ResultList Results 
    {
        get 
        {
            return resultList;
        }
        set 
        {
            resultList = value;
        }
    }
    #endregion
}

[CollectionDataContract(Name = "ResultList", 
                        Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
                        ItemName = "Result")]
public class ResultList : List<Result>
{
}

[Serializable]
[XmlType(TypeName = "Result", 
         AnonymousType = true, 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[DataContract(Name = "Result", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class Result
{
    #region Private Fields
    private double value;
    private string error; 
    #endregion

    #region Public Constructors
    public Result()
    {
        value = default(double);
        error = default(string);
    }

    public Result(double value, string error)
    {
        this.value = value;
        this.error = error;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public double Value
    {
        get
        {
            return value;
        }
        set
        {
            this.value = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 2)]
    public string Error
    {
        get
        {
            return error;
        }
        set
        {
            error = value;
        }
    } 
    #endregion
}

Service Contracts

The next step was to define the Service Contracts used by the client application to exchange messages with the Service Bus. To this purpose, I created a new project in my solution called ServiceContracts and I defined two service contract interfaces used by the client application respectively to send and receive messages from the Service Bus messaging entities. Indeed, I created two different versions of the service contract used to receive response messages:

  • The ICalculatorResponse interface is meant to be used to receive response messages from a non-sessionful queue or subscription.

  • The ICalculatorResponseSessionful interface inherits from from ICalculatorResponse service contract and is marked with the marked with the [ServiceContract(SessionMode = SessionMode.Required)] attribute. This service contract is is meant to be used to receive response messages from a non-sessionful queue or subscription.

Note that the methods defined by all the contracts are one-way. This is a mandatory requirement.

ICalculatorRequest, ICalculatorResponse, ICalculatorResponseSessionful Interfaces

#region Using Directives
using System.ServiceModel;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBus.MessageContracts;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBus.ServiceContracts
{
    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus",
                     ConfigurationName = "ICalculatorRequest", 
                     SessionMode = SessionMode.Allowed)]
    public interface ICalculatorRequest
    {
        [OperationContract(Action = "SendRequest", IsOneWay = true)]
        [ReceiveContextEnabled(ManualControl = true)]
        void SendRequest(CalculatorRequestMessage calculatorRequestMessage);
    }

    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus", 
                     ConfigurationName = "ICalculatorResponse",
                     SessionMode = SessionMode.Allowed)]
    public interface ICalculatorResponse
    {
        [OperationContract(Action = "ReceiveResponse", IsOneWay = true)]
        [ReceiveContextEnabled(ManualControl = true)]
        void ReceiveResponse(CalculatorResponseMessage calculatorResponseMessage);
    }

    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus", 
                     ConfigurationName = "ICalculatorResponse"
                     SessionMode = SessionMode.Required)]
    public interface ICalculatorResponseSessionful : ICalculatorResponse
    {
    }
}

We are now ready to look at the code of the client application.

Client Application

Since the Windows Forms application exchanges messages with the underlying WCF workflow service in an asynchronous way via Service Bus messaging entities, it acts as a client and a service application at the same time. The Windows Forms uses WCF and the NetMessagingBinding to perform the following actions:

  1. Send request messages to the requestqueue.

  2. Send request messages to the requesttopic.

  3. Receive response messages from the responsequeue.

  4. Receive response messages from the ItalyMilan subscription of the responsetopic.

Let’s start reviewing the configuration file of the client application that plays a central role in the definition of the WCF client and service endpoints used to communicate with the Service Bus.

Client Application App.Config


   1:  <?xml version="1.0"?>
   2:  <configuration>
   3:    <system.diagnostics>
   4:      <sources>
   5:        <source name="System.ServiceModel.MessageLogging"  
                     switchValue="Warning, ActivityTracing">
   6:          <listeners>
   7:            <add type="System.Diagnostics.DefaultTraceListener" 
                      name="Default">
   8:              <filter type="" />
   9:            </add>
  10:            <add name="ServiceModelMessageLoggingListener">
  11:              <filter type="" />
  12:            </add>
  13:          </listeners>
  14:        </source>
  15:      </sources>
  16:      <sharedListeners>
  17:        <add initializeData="C:\ServiceBusWFClient.svclog"
  18:             type="System.Diagnostics.XmlWriterTraceListener, System, 
  19:                   Version=2.0.0.0, Culture=neutral,
                        PublicKeyToken=b77a5c561934e089"
  20:             name="ServiceModelMessageLoggingListener"
  21:             traceOutputOptions="Timestamp">
  22:          <filter type="" />
  23:        </add>
  24:      </sharedListeners>
  25:      <trace autoflush="true" indentsize="4">
  26:        <listeners>
  27:          <clear/>
  28:          <add name="LogTraceListener"
  29:               type="Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.Client.LogTraceListener, Client"
  30:               initializeData="" />
  31:        </listeners>
  32:      </trace>
  33:    </system.diagnostics>
  34:    <startup>
  35:     <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
  36:    </startup>
  37:    <system.serviceModel>
  38:      <diagnostics>
  39:        <messageLogging logEntireMessage="true"
  40:                        logMalformedMessages="false"
  41:                        logMessagesAtServiceLevel="true"
  42:                        logMessagesAtTransportLevel="false" />
  43:      </diagnostics>
  44:      <behaviors>
  45:        <endpointBehaviors>
  46:          <behavior name="securityBehavior">
  47:            <transportClientEndpointBehavior>
  48:              <tokenProvider>
  49:                <sharedSecret issuerName="owner"
  50:                              issuerSecret="SHARED-SECRET" />
  51:              </tokenProvider>
  52:            </transportClientEndpointBehavior>
  53:          </behavior>
  54:        </endpointBehaviors>
  55:      </behaviors>
  56:      <bindings>
  57:        <netMessagingBinding>
  58:          <binding name="netMessagingBinding"
  59:                   sendTimeout="00:03:00"
  60:                   receiveTimeout="00:03:00"
  61:                   openTimeout="00:03:00"
  62:                   closeTimeout="00:03:00"
  63:                   sessionIdleTimeout="00:01:00"
  64:                   prefetchCount="-1">
  65:            <transportSettings batchFlushInterval="00:00:01" />
  66:          </binding>
  67:        </netMessagingBinding>
  68:      </bindings>
  69:      <client>
  70:        <!-- Invoke WF Service via Service Bus Queue -->
  71: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requestqueue"
  72:                  behaviorConfiguration="securityBehavior" 
  73:                  binding="netMessagingBinding"
  74:                  bindingConfiguration="netMessagingBinding" 
  75:                  contract="ICalculatorRequest"
  76:                  name="requestQueueClientEndpoint" />
  77:        <!-- Invoke WF Service via Service Bus Topic -->
  78: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requesttopic"
  79:                  behaviorConfiguration="securityBehavior"
  80:                  binding="netMessagingBinding"
  81:                  bindingConfiguration="netMessagingBinding"
  82:                  contract="ICalculatorRequest"
  83:                  name="requestTopicClientEndpoint" />
  84:      </client>
  85:      <services>
  86:        <service name="ResponseHandlerService">
  87: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsequeue"
  88:                    behaviorConfiguration="securityBehavior"
  89:                    binding="netMessagingBinding"
  90:                    bindingConfiguration="netMessagingBinding"
  91:                    name="responseQueueServiceEndpoint"
  92:                    contract="ICalculatorResponse" />
  93: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic"
  94:                    listenUri="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
  95:                    behaviorConfiguration="securityBehavior"
  96:                    binding="netMessagingBinding"
  97:                    bindingConfiguration="netMessagingBinding"
  98:                    name="responseSubscriptionServiceEndpoint"
  99:                    contract="ICalculatorResponse" />
 100:        </service>
 101:      </services>
 102:    </system.serviceModel>
 103:  </configuration>

Please find below a brief description of the main elements and sections of the configuration file:

  • Lines [3-32] define a custom trace listener called LogTraceListener used by the ResponseHandlerService to write response message to the log control of the Windows Forms application.

  • Lines [34-36] the startup section specifies which versions of the common language runtime the application supports.

  • Lines [46-53] contain the definition of the securityBehavior used by client and service endpoint to authenticate with the Access Control Service. In particular, the TransportClientEndpointBehavior is used to define shared secret credentials. For more information on how to retrieve credentials from the Azure Management Portal, see the box below.

  • Lines [57-67] contain the configuration of the NetMessagingBinding used by client and service endpoints to exchange messages with the Service Bus.

  • Lines [71-76] contain the definition of the requestQueueClientEndpoint used by the application to send request messages to the requestqueue. The address of the client endpoint is given by the concatenation of the URL of the service namespace and the name of the queue.

  • Lines [78-83] contain the definition of the requestTopicClientEndpoint used by the application to send request messages to the requesttopic. The address of the client endpoint is given by the concatenation of the URL of the service namespace and the name of the topic.

  • Lines [87-92] contain the definition of the responseQueueServiceEndpoint used by the application to receive response messages from the responsequeue. The address of the service endpoint is given by the concatenation of the URL of the service namespace and the name of the queue.

  • Lines [93-99] contain the definition of the responseSubscriptionServiceEndpoint used by the application to receive response messages from the ItalyMilan subscription for the responsetopic. When you define a WCF service endpoint that uses the NetMessagingBinding to receive messages from a subscription, you have to proceed as follows (for more information on this, see the box below):

    • As value of the address attribute, you have specify the URL of the topic which the subscription belongs to. The URL of the topic is given by the concatenation of the URL of the service namespace and the name of the topic.

    • As value of the listenUri attribute, you have to specify the URL of the subscription. The URL of the subscription is defined by the concatenation of the topic URL, the string /Subscriptions/ and the name of the subscription.

    • Assign the value Explicit to the listenUriMode attribute. The default value for the listenUriMode is Explicit, so this setting is optional.

noteNote
When you configure a WCF service endpoint to consume messages from a sessionful queue or subscription, the service contract needs to support sessions. Therefore, in our sample, when you configure the responseQueueServiceEndpoint or responseSubscriptionServiceEndpoint endpoints to receive, respectively, from a sessionful queue and subscription, you have to replace the ICalculatorResponse service contract with the sessionful ICalculatorResponseSessionful contract interface. For more information, see the Service Contracts section later in the article.

noteNote
The Service Bus supports three different types of credential schemes: SAML, Shared Secret, and Simple Web Token, but this version of the Service Bus Explorer supports only Shared Secret credentials. However, you can easily extend my code to support other credential schemes. You can retrieve the issuer-secret key from the Azure Management Portal by clicking the View button after selecting a certain namespace in the Service Bus section.

This opens up the modal dialog where you can retrieve the key by clicking the Copy to Clipboard highlighted in red.

noteNote
By convention, the name of the Default Issuer is always owner.

noteNote
When you define a WCF service endpoint that uses the NetMessagingBinding to receive messages from a subscription, if you make the mistake to assign the URL of the subscription to the address attribute of the service endpoint (as reported in configuration below), at runtime an FaultException like the following will occur:

The message with To 'sb://NAMESPACE.servicebus.windows.net/responsetopic' cannot be processed at 
the receiver, due to an AddressFilter mismatch at the EndpointDispatcher. 
Check that the sender and receiver's EndpointAddresses agree."}

Wrong Configuration


<?xml version="1.0"?>
<configuration>
  ...
  <system.serviceModel>
    ...
    <services>
      <service name="Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Service.ResponseHandlerService">
        <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
                  behaviorConfiguration="securityBehavior"
                  binding="netMessagingBinding"
                  bindingConfiguration="netMessagingBinding"
                  name="responseSubscriptionServiceEndpoint"
                  contract="ICalculatorResponse" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

The error is due to the fact the WS-Addressing To header of the message contains the address of the topic and not the address of the subscription:


<:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://www.w3.org/2005/08/addressing">
  <s:Header>
    <a:Action s:mustUnderstand="1">ReceiveResponse</a:Action>
    <a:MessageID>urn:uuid:64cb0b06-1622-4920-a035-c27b610cfcaf</a:MessageID>
    <a:To s:mustUnderstand="1">sb://NAMESPACE.servicebus.windows.net/responsetopic</a:To>
  </s:Header>
  <s:Body>... stream ...</s:Body>
</s:Envelope>

To correctly configure the service endpoint to receive messages from a subscription, you have to proceed as follows:

  • As value of the address attribute, you have specify the URL of the topic which the subscription belongs to. The URL of the topic is given by the concatenation of the URL of the service namespace and the name of the topic.

  • As value of the listenUri attribute, you have to specify the URL of the subscription. The URL of the subscription is defined by the concatenation of the topic URL, the string /Subscriptions/ and the name of the subscription.

  • Assign the value Explicit to the listenUriMode attribute. The default value for the listenUriMode is Explicit, so this setting is optional.

See the following page on MSDN for a description of the address, listenUri and listenUriMode attributes.

Correct Configuration


<?xml version="1.0"?>
<configuration>
  ...
  <system.serviceModel>
    ...
    <services>
      <service name="Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Service.ResponseHandlerService">
        <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic"
                  listenUri="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
                  behaviorConfiguration="securityBehavior"
                  binding="netMessagingBinding"
                  bindingConfiguration="netMessagingBinding"
                  name="subscriptionEndpoint"
                  contract="ICalculatorResponse" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

To accomplish the same task via API, you have to properly set the value of the Address, ListenUri and ListenUriMode properties of your ServiceEndpoint instance as indicated in this note.

The following table shows the code used by the client application to start the ResponseHandlerService used to read response messages from the responsequeue and the ItalyMilan subscription of the responsetopic. We’ll examine the code of the service in the next section.

StartServiceHost Method


private void StartServiceHost()
{
    try
    {
        // Creating the service host object as defined in config
        var serviceHost = new ServiceHost(typeof(ResponseHandlerService));
                
        // Add ErrorServiceBehavior for handling errors encounter by servicehost during execution.
        serviceHost.Description.Behaviors.Add(new ErrorServiceBehavior());


        foreach (var serviceEndpoint in serviceHost.Description.Endpoints)
        {
            if (serviceEndpoint.Name == "responseQueueServiceEndpoint")
            {
                responseQueueUri = serviceEndpoint.Address.Uri.AbsoluteUri;
                WriteToLog(string.Format(ServiceHostListeningOnQueue,
                                        serviceEndpoint.Address.Uri.AbsoluteUri));
            }
            if (serviceEndpoint.Name == "responseSubscriptionServiceEndpoint")
            {
                responseTopicUri = serviceEndpoint.Address.Uri.AbsoluteUri;
                WriteToLog(string.Format(ServiceHostListeningOnSubscription,
                                            serviceEndpoint.ListenUri.AbsoluteUri));
            }
        }

        // Start the service host
        serviceHost.Open();
        WriteToLog(ServiceHostSuccessfullyOpened);
    }
    catch (Exception ex)
    {
        mainForm.HandleException(ex);
    }
}

The following picture shows the user interface of the client application:

ServiceBus-Queues-Topics-WCF-Workflow-Service11

The radio buttons contained in the Request Method group allows choosing whether to send the request message to the requestqueue or the requesttopic, whereas the radio buttons contained in the Response Method group allows selecting whether receive the response from the responsequeue or the ItalyMilan subscription of the responsetopic. To communicate the selection to the underlying BizTalk application, the application uses a BrokeredMessageProperty object to assign the value of the responseQueueUri or responseTopicUri private fields to the ReplyTo property. The following table contains the code of the method used by the client application to send message to the Service Bus. For your convenience, comments have been added to the code to facilitate its understanding.

SendRequestMessageUsingWCF Method


private void SendRequestMessageUsingWCF(string endpointConfigurationName)
{
    try
    {
        if (string.IsNullOrEmpty(endpointConfigurationName))
        {
            WriteToLog(EndpointConfigurationNameCannotBeNull);
            return;
        }

        // Set the wait cursor
        Cursor = Cursors.WaitCursor;

        // Make sure that the request message contains at least an operation
        if (operationList == null ||
            operationList.Count == 0)
        {
            WriteToLog(OperationListCannotBeNull);
            return;
        }

        // Create warning collection
        var warningCollection = new List<string>();

        // Create request message
        var calculatorRequest = new CalculatorRequest(operationList);
        var calculatorRequestMessage = new CalculatorRequestMessage(calculatorRequest);

        // Create the channel factory for the currennt client endpoint
        // and cache it in the channelFactoryDictionary
        if (!channelFactoryDictionary.ContainsKey(endpointConfigurationName))
        {
            channelFactoryDictionary[endpointConfigurationName] = 
                new ChannelFactory<ICalculatorRequest>(endpointConfigurationName);
        }

        // Create the channel for the currennt client endpoint
        // and cache it in the channelDictionary
        if (!channelDictionary.ContainsKey(endpointConfigurationName))
        {
            channelDictionary[endpointConfigurationName] = 
                channelFactoryDictionary[endpointConfigurationName].CreateChannel();
        }

        // Use the OperationContextScope to create a block within which to access the current OperationScope
        using (new OperationContextScope((IContextChannel)channelDictionary[endpointConfigurationName]))
        {
            // Create a new BrokeredMessageProperty object
            var brokeredMessageProperty = new BrokeredMessageProperty();

            // Read the user defined properties and add them to the  
            // Properties collection of the BrokeredMessageProperty object
            foreach (var e in propertiesBindingSource.Cast<PropertyInfo>())
            {
                try
                {
                    e.Key = e.Key.Trim();
                    if (e.Type != StringType && e.Value == null)
                    {
                        warningCollection.Add(string.Format(CultureInfo.CurrentUICulture, 
                                                            PropertyValueCannotBeNull, e.Key));
                    }
                    else
                    {
                        if (brokeredMessageProperty.Properties.ContainsKey(e.Key))
                        {
                            brokeredMessageProperty.Properties[e.Key] = 
                                ConversionHelper.MapStringTypeToCLRType(e.Type, e.Value);
                        }
                        else
                        {
                            brokeredMessageProperty.Properties.Add(e.Key, 
                                ConversionHelper.MapStringTypeToCLRType(e.Type, e.Value));
                        }
                    }
                }
                catch (Exception ex)
                {
                    warningCollection.Add(string.Format(CultureInfo.CurrentUICulture, 
                        PropertyConversionError, e.Key, ex.Message));
                }
            }

            // if the warning collection contains at least one or more items,
            // write them to the log and return immediately
            StringBuilder builder;
            if (warningCollection.Count > 0)
            {
                builder = new StringBuilder(WarningHeader);
                var warnings = warningCollection.ToArray<string>();
                for (var i = 0; i < warningCollection.Count; i++)
                {
                    builder.AppendFormat(WarningFormat, warnings[i]);
                }
                mainForm.WriteToLog(builder.ToString());
                return;
            }

            // Set the BrokeredMessageProperty properties
            brokeredMessageProperty.Label = txtLabel.Text;
            brokeredMessageProperty.MessageId = Guid.NewGuid().ToString();
            brokeredMessageProperty.SessionId = sessionId;
            brokeredMessageProperty.ReplyToSessionId = sessionId;
            brokeredMessageProperty.ReplyTo = responseQueueRadioButton.Checked
                                                ? responseQueueUri
                                                : responseTopicUri;
            OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, 
                                                                    brokeredMessageProperty);
                    
            // Send the request message to the requestqueue or requesttopic
            var stopwatch = new Stopwatch();
            try
            {
                stopwatch.Start();
                channelDictionary[endpointConfigurationName].SendRequest(calculatorRequestMessage);
            }
            catch (CommunicationException ex)
            {
                if (channelFactoryDictionary[endpointConfigurationName] != null)
                {
                    channelFactoryDictionary[endpointConfigurationName].Abort();
                    channelFactoryDictionary.Remove(endpointConfigurationName);
                    channelDictionary.Remove(endpointConfigurationName);
                }
                HandleException(ex);
            }
            catch (Exception ex)
            {
                if (channelFactoryDictionary[endpointConfigurationName] != null)
                {
                    channelFactoryDictionary[endpointConfigurationName].Abort();
                    channelFactoryDictionary.Remove(endpointConfigurationName);
                    channelDictionary.Remove(endpointConfigurationName);
                }
                HandleException(ex);
            }
            finally
            {
                stopwatch.Stop();
            }
            // Log the request message and its properties
            builder = new StringBuilder();
            builder.AppendLine(string.Format(CultureInfo.CurrentCulture,
                    MessageSuccessfullySent,
                    channelFactoryDictionary[endpointConfigurationName].Endpoint.Address.Uri.AbsoluteUri,
                    brokeredMessageProperty.MessageId,
                    brokeredMessageProperty.SessionId,
                    brokeredMessageProperty.Label,
                    stopwatch.ElapsedMilliseconds));
            builder.AppendLine(PayloadFormat);
            for (var i = 0; i < calculatorRequest.Operations.Count; i++)
            {
                builder.AppendLine(string.Format(RequestFormat,
                                                    i + 1,
                                                    calculatorRequest.Operations[i].Operand1,
                                                    calculatorRequest.Operations[i].Operator,
                                                    calculatorRequest.Operations[i].Operand2));
            }
            builder.AppendLine(SentMessagePropertiesHeader);
            foreach (var p in brokeredMessageProperty.Properties)
            {
                builder.AppendLine(string.Format(MessagePropertyFormat,
                                                    p.Key,
                                                    p.Value));
            }
            var traceMessage = builder.ToString();
            WriteToLog(traceMessage.Substring(0, traceMessage.Length - 1));
        }
    }
    catch (Exception ex)
    {
        // Handle the exception
        HandleException(ex);
    }
    finally
    {
        // Restoire the defaulf cursor
        Cursor = Cursors.Default;
    }
}

Response Handler Service

The following table contains the code of the WCF service used by the client application to retrieve and log response messages from the responsequeue and ItalyMilan subscription of the responsetopic. To accomplish this result, the service exposes two different endpoints each of which uses the NetMessagingBinding and receives messages from one of the two queues. Indeed, each subscription can be seen as a virtual queue getting copies of messages published to the topic they belong to. The table below shows the code of the ResponseHandlerService class. As you can notice, the service retrieves the BrokeredMessageProperty from the Properties collection of the incoming WCF message and uses this object to access to the properties of the response message. Since in the ICalculatorResponse service contract the ReceiveResponse method is decorated with the [ReceiveContextEnabled(ManualControl = true)], the receive acknowledgement must be explicitly signaled by service method. This requires the service to explicitly invoke the ReceiveContext.Complete method to commit the receive operation. In fact, as we said at the beginning of the article, when the ManualControl property is set to true, the message received from the channel is delivered to the service operation with a lock for the message. It is the responsibility of the service implementation to either to call Complete(TimeSpan) or Abandon(TimeSpan) to signal the receive completion of the message. Failure to call either of these results in the lock being held on the message until the lock timeout interval elapses. Once the lock is released (either through calling Abandon(TimeSpan) or lock timeout) the message is re-dispatched from the channel to the service. Calling Complete(TimeSpan) marks the message as successfully received.

ResponseHandlerService Class


      [ServiceBehavir(Namespace = "http://windwsazure.cat.micrsft.cm/samples/servicebus", 
                 CnfiguratinName = "RespnseHandlerService")]
 public class RespnseHandlerService : ICalculatrRespnseSessinful
 {
     #regin Private Cnstants
     //***************************
     // Frmats
     //***************************
     private cnst string MessageSuccessfullyReceived = "Respnse Message Received:\n - EndpintUrl:[{0}]\n - CrrelatinId=[{1}]\n - SessinId=[{2}]\n - Label=[{3}]";
     private cnst string ReceivedMessagePrpertiesHeader = "Prperties:";
     private cnst string PayladFrmat = "Paylad:";
     private cnst string StatusFrmat = " - Status=[{0}]";
     private cnst string ResultFrmat = " - Result[{0}]: Value=[{1}] Errr=[{2}]";
     private cnst string MessagePrpertyFrmat = " - Key=[{0}] Value=[{1}]";
 
     //***************************
     // Cnstants
     //***************************
     private cnst string Empty = "EMPTY";
     #endregin
 
     #regin Public peratins
     [peratinBehavir]
     public vid ReceiveRespnse(CalculatrRespnse calculatrRespnse)
     {
         try
         {
             // Get the message prperties
             var incmingPrperties = peratinCntext.Current.IncmingMessagePrperties;
             if (calculatrRespnse != null)
             {
                 var brkeredMessagePrperty = incmingPrperties[BrkeredMessagePrperty.Name] as BrkeredMessagePrperty;
 
                 // Trace the respnse message
                 var builder = new StringBuilder();
                 if (brkeredMessagePrperty != null)
                     builder.AppendLine(string.Frmat(MessageSuccessfullyReceived, 
                                                         peratinCntext.Current.Channel.LcalAddress.Uri.AbsluteUri,
                                                         brkeredMessagePrperty.CrrelatinId ?? Empty,
                                                         brkeredMessagePrperty.SessinId ?? Empty,
                                                         brkeredMessagePrperty.Label ?? Empty));
                 builder.AppendLine(PayladFrmat);
                 builder.AppendLine(string.Frmat(StatusFrmat,
                                                     calculatrRespnse.Status));
                 if (calculatrRespnse.Results != null && 
                     calculatrRespnse.Results.Cunt > 0)
                 {
                     fr (int i = 0; i < calculatrRespnse.Results.Cunt; i++)
                     {
                         builder.AppendLine(string.Frmat(ResultFrmat, 
                                                             i + 1, 
                                                             calculatrRespnse.Results[i].Value,
                                                             calculatrRespnse.Results[i].Errr));
                     }
                 }
                 builder.AppendLine(ReceivedMessagePrpertiesHeader);
                 if (brkeredMessagePrperty != null)
                 {
                     freach (var prperty in brkeredMessagePrperty.Prperties)
                     {
                         builder.AppendLine(string.Frmat(MessagePrpertyFrmat,
                                                             prperty.Key,
                                                             prperty.Value));
                     }
                 }
                 var traceMessage = builder.TString();
                 Trace.WriteLine(traceMessage.Substring(0, traceMessage.Length - 1));
             }
             //Cmplete the Message
             ReceiveCntext receiveCntext;
             if (ReceiveCntext.TryGet(incmingPrperties, ut receiveCntext))
             {
                 receiveCntext.Cmplete(TimeSpan.FrmSecnds(10.0d));
             }
             else
             {
                 thrw new InvalidperatinExceptin("Receiver is in peek lck mde but receive cntext is nt available!");
             }
         }
         catch (Exceptin ex)
         {
             Trace.WriteLine(ex.Message);
         }
     } 
     #endregin
 }

BrokeredMessagePropertyActivity

WCF workflow services provide a productive environment for authoring long-running, durable operations or services. Workflow services are implemented using WF activities that can make use of WCF for sending and receiving data. Explaining in detail how to build a WCF workflow service is out of the scope of the present article. For more information on WCF workflow services, see the following articles:

WF 4.0 introduced Messaging Activities that allow developers to expose or consume WCF services in an easy and flexible way. In particular, Messaging activities enable workflows to send data out to other systems (Send, SendReply), and to receive data from other systems (Receive, ReceiveReply) using WCF. However, these activities hide lot of WCF plumbing. In particular, Messaging Activities do not provide access to the current OperationContext that can be used to perform the following operation:

  • On the send side, the OperationContext can be used to include additional message headers in the SOAP envelope or add message properties to the outgoing message.

  • On the receive side, the OperationContext can be used to retrieve message properties and security information from for the incoming message.

As we have seen in the first of the article, when an application uses WCF and the NetMessagingBinding to send a message to a queue or a topic, the message is wrapped into a soap envelope and encoded. To set BrokeredMessage specific properties, you need to create a BrokeredMessageProperty object, set the properties on it and add it to the Properties collection of the WCF Message. Therefore, to retrieve the BrokeredMessageProperty from an inbound message or to add a BrokeredMessageProperty to the Properties collection of an outgoing WCF Message we need to extend the functionality provided out-of-the-box by Messaging Activities. Fortunately, WF 4.0 enables to extend the runtime behavior of Messaging Activities using the IReceiveMessageCallback and ISendMessageCallback. In particular:

  • The IReceiveMessageCallback interface implements a callback to be executed when a service message is received by the Receive activity.

  • The ISendMessageCallback.interface implements a callback that is called just before a message is sent on the wire by the Send activity.

In my demo, I exploited these extensibility points to create a custom NativeActivity called BrokeredMessagePropertyActivity that allows to:

  • Get the BrokeredMessageProperty from the properties of an inbound WCF message.

  • Set the BrokeredMessageProperty for an outbound WCF message.

Roman Kiss in his article followed the same approach and created a more sophisticated activity that exposes a property for each property exposed by the BrokeredMessageProperty class. I strongly suggest you to look at his article as it basically explains an alternative way to implement the same technique that I describe in the present article.

For your convenience, I included the code of the BrokeredMessagePropertyActivity in the table below:

BrokeredMessagePropertyActivity Class


[Designer(typeof(BrokeredMessagePropertyActivityDesigner))]
public class BrokeredMessagePropertyActivity : NativeActivity
{
    #region Public Properties
    [Browsable(false)]
    public Activity Body { get; set; }
    public InOutArgument<BrokeredMessageProperty> BrokeredMessageProperty { get; set; }
    #endregion

    #region NativeActivity Overriden Methods
    protected override void CacheMetadata(NativeActivityMetadata metadata)
    {
        metadata.AddChild(Body);
        base.CacheMetadata(metadata);
    }

    protected override void Execute(NativeActivityContext context)
    {
        // Add the BrokeredMessagePropertyMessageCallback implementation as an Execution property 
        var value = context.GetValue(BrokeredMessageProperty);
        context.Properties.Add(typeof(BrokeredMessagePropertyCallback).Name,
                                new BrokeredMessagePropertyCallback(value));
        context.ScheduleActivity(Body, OnBodyCompleted);
    }

    private void OnBodyCompleted(NativeActivityContext context, ActivityInstance instance)
    {
        // Sets the value of the BrokeredMessageProperty argument
        var callback = context.Properties.Find(typeof(BrokeredMessagePropertyCallback).Name) as BrokeredMessagePropertyCallback;
        if (callback != null)
        {
            context.SetValue(BrokeredMessageProperty, callback.BrokeredMessageProperty);
        }
    } 
    #endregion  
}

BrokeredMessagePropertyCallback Class


[DataContract]
public class BrokeredMessagePropertyCallback : IReceiveMessageCallback, ISendMessageCallback
{
    #region Public Properties
    [DataMember]
    public BrokeredMessageProperty BrokeredMessageProperty { get; set; } 
    #endregion

    #region Public Constructors
    public BrokeredMessagePropertyCallback(BrokeredMessageProperty property)
    {
        BrokeredMessageProperty = property;
    }
    #endregion

    #region IReceiveMessageCallback Methods
    public void OnReceiveMessage(OperationContext operationContext, ExecutionProperties activityExecutionProperties)
    {
        try
        {
            // Get the BrokeredMessageProperty from an inbound message
            var incomingMessageProperties = operationContext.IncomingMessageProperties;
            BrokeredMessageProperty = incomingMessageProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    } 
    #endregion

    #region ISendMessageCallback Methods
    public void OnSendMessage(OperationContext operationContext)
    {
        // Set the BrokeredMessageProperty for an outbound message 
        if (BrokeredMessageProperty != null)
        {
            operationContext.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, BrokeredMessageProperty);
        }
    } 
    #endregion
}

The BrokeredMessagePropertyActivity can be used to wrap Receive or Send activities. This automatically provides access to the BrokeredMessageProperty that can be used to read and write explicit and user-defined properties of a BrokeredMessage.

Calculator Activity

To elaborate the incoming request and generate a response, I created a custom code activity called CalculatorActivity whose code is shown in the following table.

CalculatorActivity Class


#region Using Directives
using System;
using System.Activities;
using System.ComponentModel;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.DataContracts;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.WorkflowActivities
{
    /// <summary>
    /// This class can be used to process a calculator request.
    /// </summary>
    [Designer(typeof(CalculatorActivityDesigner))]
    public sealed class CalculatorActivity : CodeActivity
    {
        #region Private Constants
        private const string Empty = "Empty";
        private const string MessageId = "MessageId";
        private const string SessionId = "SessionId";
        private const string CorrelationId = "CorrelationId";
        private const string Label = "Label";
        private const string ReplyTo = "ReplyTo";
        private const string Source = "Source";
        private const string CalculatoService = "CalculatoService";
        private const string Ok = "ok";
        private const string Failed = "Failed";
        private const string OperationsHeader = "Operations:";
        private const string OperationFormat = "{0} {1} {2} = {3}";
        private const string RequestMessagePropertiesHeader = "Request Message Properties:";
        private const string ResponseMessagePropertiesHeader = "Response Message Properties:";
        private const string RequestMessageUserDefinedPropertiesHeader = "Request Message User-Defined Properties:";
        private const string ResponseMessageUserDefinedPropertiesHeader = "Response Message User-Defined Properties:";
        private const string PropertyFormat = "Key=[{0}] Value=[{1}]";
        private const string OperationUnknownErrorMessageFormat = "The operation failed because the operator {0} is unknown.";

        #endregion

        #region Activity Arguments
        [DefaultValue(null)]
        public InArgument<CalculatorRequest> CalculatorRequest { get; set; }
        [DefaultValue(null)]
        public InArgument<BrokeredMessageProperty> InboundBrokeredMessageProperty 
                                                   { get; set; }
        public OutArgument<BrokeredMessageProperty> OutboundBrokeredMessageProperty 
                                                   { get; set; }
        public OutArgument<CalculatorResponse> CalculatorResponse { get; set; }
        #endregion

        #region Private Fields
        private readonly string line = new string('-', 79);
        #endregion

        #region Protected Methods
        /// <summary>
        /// Processes a calculator calculatorRequest.
        /// </summary>
        /// <param name="context">The execution context under which the activity executes.</param>
        protected override void Execute(CodeActivityContext context)
        {
            // Obtain the runtime value of the CalculatorRequest input arguments
            var calculatorRequest = context.GetValue(CalculatorRequest);
            var calculatorResponse = new CalculatorResponse();
            if (calculatorRequest == null)
            {
                context.SetValue(CalculatorResponse, calculatorResponse);
                return;
            }
            // Print the properties of the inbound BrokeredMessageProperty
            var brokeredMessageProperty = context.GetValue(InboundBrokeredMessageProperty);
            if (brokeredMessageProperty != null)
            {
                Console.WriteLine(RequestMessagePropertiesHeader);
                Console.WriteLine(line);
                Console.WriteLine(string.Format(PropertyFormat, 
                                                MessageId, 
                                      brokeredMessageProperty.MessageId ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                SessionId, 
                                      brokeredMessageProperty.SessionId ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                ReplyTo, 
                                      brokeredMessageProperty.ReplyTo ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                Label, 
                                      brokeredMessageProperty.Label ?? Empty));
                Console.WriteLine(line);
                if (brokeredMessageProperty.Properties.Count > 0)
                {
                    Console.WriteLine(RequestMessageUserDefinedPropertiesHeader);
                    Console.WriteLine(line);
                    foreach (var property in brokeredMessageProperty.Properties)
                    {
                        Console.WriteLine(string.Format(PropertyFormat, 
                                                        property.Key, 
                                                        property.Value));
                    }
                    Console.WriteLine(line);
                }
            }

            // Process the request message and create a response message
            string error = null;
            calculatorResponse.Status = Ok;
            if (calculatorRequest.Operations.Count > 0)
            {
                Console.WriteLine(OperationsHeader);
                Console.WriteLine(line);
                foreach (var operation in calculatorRequest.Operations)
                {
                    double value = 0;
                    var succeeded = true;
                    switch (operation.Operator)
                    {
                        case "+":
                            value = operation.Operand1 + operation.Operand2;
                            break;
                        case "-":
                            value = operation.Operand1 - operation.Operand2;
                            break;
                        case "*":
                        case "x":
                            value = operation.Operand1 * operation.Operand2;
                            break;
                        case "/":
                        case "\\":
                        case ":":
                            value = operation.Operand1 / operation.Operand2;
                            break;
                        default:
                            error = string.Format(OperationUnknownErrorMessageFormat,
                                                  operation.Operator);
                            succeeded = false;
                            calculatorResponse.Status = Failed;
                            break;
                    }
                    Console.WriteLine(succeeded
                                          ? string.Format(OperationFormat, operation.Operand1, operation.Operator,
                                                          operation.Operand2, value)
                                          : error);
                    calculatorResponse.Results.Add(new Result(value, error));
                }
                Console.WriteLine(line);
            }
            context.SetValue(CalculatorResponse, calculatorResponse);

            // Create a new BrokeredMessageProperty for the reply message
            var replyBrokeredMessageProperty = new BrokeredMessageProperty
            {
               MessageId = Guid.NewGuid().ToString(),
               CorrelationId = brokeredMessageProperty != null ? 
                               brokeredMessageProperty.MessageId :
                               null,
               SessionId = brokeredMessageProperty != null ?
                           brokeredMessageProperty.ReplyToSessionId :
                           null,
               Label = brokeredMessageProperty != null ?
                       brokeredMessageProperty.Label :
                       null
            };
            if (brokeredMessageProperty != null)
            {
                foreach (var property in brokeredMessageProperty.Properties)
                {
                    replyBrokeredMessageProperty.Properties.Add(property);
                }
            }

            // Print the properties of the outbound BrokeredMessageProperty
            replyBrokeredMessageProperty.Properties.Add(Source, CalculatoService);
            Console.WriteLine(ResponseMessagePropertiesHeader);
            Console.WriteLine(line);
            Console.WriteLine(string.Format(PropertyFormat, 
                                            MessageId, 
                               replyBrokeredMessageProperty.MessageId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            CorrelationId, 
                               replyBrokeredMessageProperty.CorrelationId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            SessionId, 
                               replyBrokeredMessageProperty.SessionId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            ReplyTo, 
                               replyBrokeredMessageProperty.ReplyTo ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            Label, 
                               replyBrokeredMessageProperty.Label ?? Empty));
            Console.WriteLine(line);
            if (replyBrokeredMessageProperty.Properties.Count > 0)
            {
                Console.WriteLine(ResponseMessageUserDefinedPropertiesHeader);
                Console.WriteLine(line);
                foreach (var property in replyBrokeredMessageProperty.Properties)
                {
                    Console.WriteLine(string.Format(PropertyFormat, 
                                                    property.Key, 
                                                    property.Value));
                }
                Console.WriteLine(line);
            }

            // Set the outbound context property
            context.SetValue(OutboundBrokeredMessageProperty, 
                             replyBrokeredMessageProperty);
        }
        #endregion
    }
}

This activity exposes two input arguments and two output arguments respectively:

  • CalculatorRequest: this InArgument of type CalculatorRequest allows the workflow to pass the request to the activity.

  • InboundBrokeredMessageProperty: this InArgument of type BrokeredMessageProperty allows to pass as an input parameter the BrokeredMessageProperty extracted from the WCF request message.

  • CalculatorResponse: this OutArgument of type CalculatorResponse is used by the code activity to return the response as an output parameter to the workflow.

  • OutboundBrokeredMessageProperty: this OutArgument is used by the code activity to return the outbound BrokeredMessageProperty to the workflow that will use an instance of the BrokeredMessagePropertyActivity to assign the value of this output parameter to the BrokeredMessageProperty of the WCF response message.

In a nutshell, the CalculatorActivity receives the inbound message and BrokeredMessageProperty as input arguments, processes the request message and generates a response message and an outbound BrokeredMessageProperty. When running in a console application, the activity traces the properties of the inbound and outbound BrokeredMessageProperty in the standard output.

WCF Workflow Service

In this section I will focus my attention on how the WCF workflow service implements communications with the client application using Service Bus queues and topics. In my demo, the WCF workflow service is hosted in a console application, but you can easily change the solution to run the WCF workflow service in IIS-hosted application on-premises or in an Azure role in the cloud. The following table contains the code used by the console application to initialize and open a WorkflowServiceHost object. In particular, the local http endpoint specified in the object constructor can be used to retrieve the WSDL exposed by the WCF workflow service.

Program Class


using System;
using System.Xaml;
using System.ServiceModel.Activities;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.Service;

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.WorkflowConsoleApplication
{

    class Program
    {
        static void Main(string[] args)
        {
            var settings = new XamlXmlReaderSettings()
                               {
                                   LocalAssembly = typeof(Program).Assembly
                               };
            var reader = new XamlXmlReader(@"..\..\CalculatorService.xamlx", settings); 
            var service = (WorkflowService) XamlServices.Load(reader);
            using (var host = new WorkflowServiceHost(service, new Uri("http://localhost:7571")))
            {
                host.Description.Behaviors.Add(new ErrorServiceBehavior());
                host.Open();
                Console.WriteLine("Press [ENTER] to exit");
                Console.ReadLine();
                host.Close();
            }
        }
    }
}

The following table contains the configuration file of the console application that plays a key role in the definition of the WCF client and service endpoints used by the WCF workflow service to exchange request and response messages with the client application via Service Bus queues and topics.

Console Application App.Config


   1:  <?xml version="1.0"?>
   2:  <configuration>
   3:    <system.diagnostics>
   4:      <sources>
   5:        <source name="System.ServiceModel.MessageLogging" switchValue="Warning, ActivityTracing">
   6:          <listeners>
   7:            <add type="System.Diagnostics.DefaultTraceListener" name="Default">
   8:              <filter type="" />
   9:            </add>
  10:            <add name="ServiceModelMessageLoggingListener">
  11:              <filter type="" />
  12:            </add>
  13:          </listeners>
  14:        </source>
  15:      </sources>
  16:      <sharedListeners>
  17:        <add initializeData="C:\WorkflowConsoleApplication.svclog"
  18:             type="System.Diagnostics.XmlWriterTraceListener, System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
  19:             name="ServiceModelMessageLoggingListener"
  20:             traceOutputOptions="Timestamp">
  21:          <filter type="" />
  22:        </add>
  23:      </sharedListeners>
  24:    </system.diagnostics>
  25:    <startup>
  26:      <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
  27:    </startup>
  28:    <system.serviceModel>
  29:      <diagnostics>
  30:        <messageLogging logEntireMessage="true"
  31:                        logMalformedMessages="true"
  32:                        logMessagesAtServiceLevel="true"
  33:                        logMessagesAtTransportLevel="true" />
  34:        </diagnostics>
  35:      <behaviors>
  36:        <serviceBehaviors>
  37:          <behavior>
  38:            <serviceMetadata httpGetEnabled="true" />
  39:            <serviceDebug includeExceptionDetailInFaults="true" />
  40:            <useRequestHeadersForMetadataAddress />
  41:            <workflowUnhandledException action="AbandonAndSuspend" />
  42:          </behavior>
  43:        </serviceBehaviors>
  44:        <endpointBehaviors>
  45:          <behavior name="securityBehavior">
  46:            <transportClientEndpointBehavior>
  47:              <tokenProvider>
  48:                <sharedSecret issuerName="owner" 
  49:                              issuerSecret="ISSUER_SECRET"/>
  50:              </tokenProvider>
  51:            </transportClientEndpointBehavior>
  52:          </behavior>
  53:        </endpointBehaviors>
  54:      </behaviors>
  55:      <bindings>
  56:        <netMessagingBinding>
  57:          <binding name="netMessagingBinding" 
  58:                   sendTimeout="00:03:00" 
  59:                   receiveTimeout="00:03:00" 
  60:                   openTimeout="00:03:00" 
  61:                   closeTimeout="00:03:00" 
  62:                   sessionIdleTimeout="00:01:00" 
  63:                   prefetchCount="-1">
  64:            <transportSettings batchFlushInterval="00:00:01"/>
  65:          </binding>
  66:        </netMessagingBinding>
  67:      </bindings>
  68:      <client>
  69: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsequeue" 
  70:                  behaviorConfiguration="securityBehavior" 
  71:                  binding="netMessagingBinding" 
  72:                  bindingConfiguration="netMessagingBinding" 
  73:                  contract="ICalculatorResponse" 
  74:                  name="ResponseQueueClientEndpoint"/>
  75: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic" 
  76:                  behaviorConfiguration="securityBehavior" 
  77:                  binding="netMessagingBinding" 
  78:                  bindingConfiguration="netMessagingBinding" 
  79:                  contract="ICalculatorResponse" 
  80:                  name="ResponseTopicClientEndpoint"/>
  81:      </client>
  82:      <services>
  83:        <service name="CalculatorService">
  84: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requestqueue" 
  85:                    behaviorConfiguration="securityBehavior" 
  86:                    binding="netMessagingBinding" 
  87:                    bindingConfiguration="netMessagingBinding" 
  88:                    name="RequestQueueServiceEndpoint" 
  89:                    contract="ICalculatorRequest"/>
  90: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requesttopic"
  91:           listenUri=
"sb://NAMESPACE.servicebus.windows.net/requesttopic/Subscriptions/ItalyMilan"
  92:                    behaviorConfiguration="securityBehavior"
  93:                    binding="netMessagingBinding"
  94:                    bindingConfiguration="netMessagingBinding"
  95:                    name="RequestTopicServiceEndpoint"
  96:                    contract="ICalculatorRequest" />
  97:        </service>
  98:      </services>
  99:    </system.serviceModel>
 100:    <startup>
 101:     <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
 102:    </startup>
 103:  </configuration>

Below you can find a brief description of the main elements and sections of the configuration file. Note that the definition of client and service endpoints is dual with respect to the configuration of client and service endpoints used by the client application.

  • Lines [3-33] enable tracing, configure trace sources to emit traces and set trace level. In particular, the diagnostics section is configured to trace messages at a service and transport level to a log file that can be examined using the Service Trace Viewer Tool (SvcTraceViewer.exe)

  • Lines [36-43] specify the default service behavior used by the WCF workflow service.

  • Lines [44-53] contain the definition of the securityBehavior used by client and service endpoint to authenticate with the Access Control Service. In particular, the TransportClientEndpointBehavior is used to define shared secret credentials.

  • Lines [56-66] contain the configuration of the NetMessagingBinding used by client and service endpoints to exchange messages with the Service Bus.

  • Lines [69-74] contain the definition of the ResponseQueueClientEndpoint used by the WCF workflow service to send response messages to the responsequeue. The address of the client endpoint is given by the concatenation of the URL of the service namespace and the name of the queue.

  • Lines [78-83] contain the definition of the ResponseTopicClientEndpoint used by the WCF workflow service to send response messages to the responsetopic. The address of the client endpoint is given by the concatenation of the URL of the service namespace and the name of the topic.

  • Lines [83-89] contain the definition of the RequestQueueServiceEndpoint used by the WCF workflow service to receive request messages from the requestqueue. The address of the service endpoint is given by the concatenation of the URL of the service namespace and the name of the queue.

  • Lines [90-96] contain the definition of the RequestTopicServiceEndpoint used by the application to receive request messages from the ItalyMilan subscription for the requesttopic. When you define a WCF service endpoint that uses the NetMessagingBinding to receive messages from a subscription, you have to proceed as follows:

    • As value of the address attribute, you have specify the URL of the topic which the subscription belongs to. The URL of the topic is given by the concatenation of the URL of the service namespace and the name of the topic.

    • As value of the listenUri attribute, you have to specify the URL of the subscription. The URL of the subscription is defined by the concatenation of the topic URL, the string /Subscriptions/ and the name of the subscription.

    • Assign the value Explicit to the listenUriMode attribute. The default value for the listenUriMode is Explicit, so this setting is optional.

  • Lines [100-102] specify which version of the common language runtime is supported by the application.

Now let’s examine the steps necessary to create a WCF workflow service that receives a request and sends a reply via Service Bus queues and topics. When I first created the WCF Workflow Service, the workflow just contained a Sequence activity with a Receive activity followed by a SendReply activity as shown in the following illustration.

ServiceBus-Queues-Topics-WCF-Workflow-Service12

As a first step, I clicked the surface of the workflow and I assigned the string CalculatorService as a value to both the ConfigurationName and Name properties of the WorkflowService, as shown in the picture below. In particular, the ConfigurationName property indicates the configuration name of the workflow service and its value needs to be equal to the value of the name attribute of the service element in the configuration file.

ServiceBus-Queues-Topics-WCF-Workflow-Service13

Then I selected the Sequential activity, I clicked the Variables button to display the corresponding editor and I created the following variables:

  • calculatorRequest: this variable is of type CalculatorRequest and, as the name suggest, contains the body of the request message. Its value is set by the Receive activity used to receive the request message from the requestqueue or the ItalyMilan subscription of the requesttopic.

  • calculatorResponse: this variable is of type CalculatorResponse and contains the body of the response message returned to the client application. Its value is set by a custom activity that processes the request and produces a response.

  • inboundBrokeredMessageProperty: this variable contains the BrokeredMessageProperty of the request message. Its value is set by an instance of the BrokeredMessagePropertyActivity that wraps the Receive activity e reads the BrokeredMessageProperty from the properties of the WCF request message.

  • outboundBrokeredMessageProperty: this variable contains the BrokeredMessageProperty of the response message. Its value is set by the custom activity that processes the request and produces a response. A BrokeredMessagePropertyActivity is used to wrap the Send activity and assign the value of this variable to the BrokeredMessageProperty of the WCF response message.

ServiceBus-Queues-Topics-WCF-Workflow-Service14

Then I added a TryCatch activity to the workflow and I wrapped the Receive activity with an instance of the BrokeredMessagePropertyActivity as shown in the following picture.

ServiceBus-Queues-Topics-WCF-Workflow-Service15

Next I clicked the BrokeredMessagePropertyActivity and I assigned the inboundBrokeredMessageProperty variable to its BrokeredMessageProperty property, as shown in the picture below.

ServiceBus-Queues-Topics-WCF-Workflow-Service16

Then I selected the Receive activity and I configured its properties to receive request messages from the requestqueue and ItalyMilan subscription of the requesttopic using, respectively, the RequestQueueServiceEndpoint and RequestTopicServiceEndpoint service endpoints defined in the configuration file.

ServiceBus-Queues-Topics-WCF-Workflow-Service17

In particular, I used the ServiceContractName property of the Receive activity to specify the target namespace and contract name for the service endpoint and I used the Action property to specify the action header of the request message as specified in the ICalculatorRequest service contract.

Then I added an instance of the CalculatorActivity to the WCF workflow service below the BrokeredMessagePropertyActivity and I configured its properties as shown in the following picture.

ServiceBus-Queues-Topics-WCF-Workflow-Service18

Next, I added an If activity to the workflow below the CalculatorActivity and I configured its Condition property as follows:

Not String.IsNullOrEmpty(inboundBrokeredMessageProperty.ReplyTo) And 
inboundBrokeredMessageProperty.ReplyTo.ToLower().Contains("topic")

Then I created an instance of the BrokeredMessagePropertyActivity in both branches of the If activity and I added a Send activity to each of them, as shown in the following picture.

ServiceBus-Queues-Topics-WCF-Workflow-Service19

This way, if the reply address specified in the ReplyTo property of the inbound BrokeredMessageProperty contains the string “topic”, the response is sent to the responsetopic, and otherwise, the response is sent to the responsequeue.

In both cases, an instance of the BrokeredMessagePropertyActivity (identified by the Set BrokeredMessage display name) is used to wrap the Send activity and assign the outbound BrokeredMessageProperty to the properties collection of the WCF response message. To perform this operation, I assigned the value of the outboundBrokeredMessageProperty variable to the BrokeredMessageProperty property of both instances of the BrokeredMessagePropertyActivity, as shown in the picture below.

ServiceBus-Queues-Topics-WCF-Workflow-Service20

Then, I selected the Send activity in the Then branch and I configured its property as follows to send the reply message to the responsetopic using the ResponseTopicClientEndpoint defined in the configuration file.

ServiceBus-Queues-Topics-WCF-Workflow-Service21

In particular, I used the ServiceContractName property of the Send activity to specify the target namespace and contract name for the client endpoint, the Action property to specify the action header of the response message as specified in the ICalculatorResponse service contract, and the EndpointConfigurationName to indicate the name of the client endpoint defined in the configuration file.

Likewise, I configured the Send activity in the Else branch, as shown in the following figure.

ServiceBus-Queues-Topics-WCF-Workflow-Service22

The following picture shows the entire workflow:

ServiceBus-Queues-Topics-WCF-Workflow-Service23

Testing the Solution

Assuming that you have properly configured the solution, you can proceed as follows to test it.

  • To send a request message to the WCF workflow service via the requestqueue, select the Queue radio button in the Request Methods group.

  • To send a request message to the WCF workflow service via the requesttopic, select the Topic radio button in the Request Methods group.

  • To ask the WCF workflow service to send the response message to the responsequeue, select the Queue radio button in the Response Methods group.

  • To ask the WCF workflow service to send the response message to the responsetopic, select the Topic radio button in the Response Methods group.

The figure below shows the most interesting combination:

  • The client sends a request message to the requesttopic.

  • The WCF workflow service reads the request from the ItalyMilan subscription for the requesttopic and sends the response to the responsetopic.

  • The client application receives the response message from the ItalyMilan subscription defined on the responsetopic.

The following picture shows the information logged by the WCF workflow service on the standard output of the host console application:

ServiceBus-Queues-Topics-WCF-Workflow-Service24

The following picture shows the information logged by the client application during the call:

ServiceBus-Queues-Topics-WCF-Workflow-Service25

In particular, you can note the following:

  1. The request message was sent to the requesttopic.

  2. The response message was received from the responsetopic.

  3. The CorrelationId of the response message is equal to the MessageId of the request message.

  4. The Label property of the request message has been copied by the WCF workflow service to the Label property of the response message.

  5. All user-defined properties have been copied by the WCF workflow service from the request message to the response message.

  6. The WCF workflow service added the Source user-defined property to the response.

  7. The Area property was added by the rule action defined on the ItalyMilan subscription.

Conclusions

In this article we have seen how to integrate a WCF workflow service with the Service Bus Brokered Messaging and how to achieve a full interoperability between these two technologies simply by using their native features and a custom activity to handle the BrokeredMessageProperty. I look forward hearing your feedbacks and in the meantime you can download the companion code for this article from MSDN Code Gallery.

Topluluk İçeriği

Ekle
Show:
© 2014 Microsoft