Продажи: 1-800-867-1389

Как интегрировать службы WCF Workflow Service с очередями и разделами Service Bus

Обновлено: Март 2014 г.

Автор: Паоло Сальватори (Paolo Salvatori)

Рецензенты: Ральф Скиллас (Ralph Squillace), Сидни Хига (Sidney Higa)

Документ содержит вводные сведения об обмене сообщениями на основе Service Bus и рекомендации по интеграции службы WCF Workflow Service с очередями и разделами Service Bus. В документе рассматриваются этапы построения службы WCF Workflow Service, которая использует очереди и разделы для обмена сообщениями с клиентским приложением. Для понимания этого документа настоятельно рекомендуется прочесть статью Обмен сообщениями на основе Azure Service Bus, написанную Романом Киссом (Roman Kiss), в которой рассматривается более проработанное решение поставленной выше задачи, а также описан ряд повторно используемых действий для службы WCF Workflow Service, который поможет обеспечить взаимодействие с очередями и разделами Service Bus.

Дополнительные сведения о SQL Azure Service Bus см. в следующих источниках:

Очереди обеспечивают возможности обмена сообщениями, благодаря которым большой пласт разнородных приложений, работающих локально или в облаке, может обмениваться сообщениями гибким, безопасным и надежным образом по сети и доверенным каналам.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба1

Очереди размещаются в Azure в реплицированной и надежной инфраструктуре. Максимальный размер очереди — 5 ГБ. Максимальный размер сообщения составляет 256 КБ, однако с помощью сеансов можно создавать потоки связанных сообщений неограниченного размера. Доступ к очередям осуществляется с помощью следующих приложений:

Сущности очередей дают следующие возможности.

  • Корреляция на основе сеанса, которая позволяет строить мультиплексные пути запросов и ответов. В реалистичной конкурентной среде, где множество рабочих процессов получают сообщения от одной и той же очереди на основе сеанса или подписки, сообщения с одинаковым идентификатором SessionId доставляются одному клиенту.

  • Возможность указывать время, когда сообщение должно быть добавлено в очередь.

  • Надежные схемы доставки с использованием режима получения PeekLock (сообщение остается в очереди до тех пор, пока точно не будет обработано).

  • Поддержка транзакций, обеспечивающих атомическую фиксацию пакетов операций обмена сообщениями. Эта функция позволяет приложению отправить несколько сообщений в контексте одной и той же транзакции и в силу этого зафиксировать или отменить всю операцию как единый блок.

  • Обнаружение входящих повторяющихся сообщений, благодаря чему клиенты могут отправлять одно сообщение несколько раз без пагубных последствий.

  • Специальный обработчик для сообщений, которые не могут быть обработаны или срок хранения которых истек.

  • Задержка сообщений для последующей обработки. (Эта функция особенно полезна в тех случаях, когда получение сообщений происходит в ожидаемой последовательности и их нужно отложить, пока процесс ожидает определенного сообщения, разрешающего дальнейшее движение, либо когда сообщения должны обрабатываться согласно набору свойств, определяющих приоритет в период всплесков трафика.)

Два самых важных типа в .NET API для опосредованных сообщений (т. е. сообщений, которые используются в очередях, разделах и подписках Service Bus) — это класс BrokeredMessage (обеспечивает доступ к таким свойствам, как MessageId, SessionID и CorrelationId, которые обеспечивают среди прочего автоматическое выявление повторяющихся элементов и обмена данными в сеансе), а также класс QueueDescription, который может быть использован для управления работой создаваемой очереди. Класс QueueDescription обладает следующими важными свойствами.

  • Свойство DefaultMessageTimeToLive указывает реальное значение сообщения по умолчанию.

  • Свойство DuplicateDetectionHistoryTimeWindow определяет длительность истории выявления повторяющихся сообщений.

  • Свойство EnableDeadLetteringOnMessageExpiration позволяет включить или отключить перемещение в «мертвую» очередь по истечении срока сообщения.

  • Свойство LockDuration определяет продолжительность блокирования, которая используется клиентом в режиме получения PeekLock.

  • Свойство MaxSizeInMegabytes определяет максимальный размер очереди в мегабайтах.

  • Свойство RequiresDuplicateDetection включает или выключает выявление повторяющихся сообщений.

  • Свойство RequiresSession включает или выключает сеансы.

  • Свойство MessageCount возвращает число сообщений в очереди. (Интеллектуальная система может по этому свойству принять решение об уменьшении или увеличении числа конкурирующих клиентов, которые последовательно получают и обрабатывают сообщения из очереди.)

noteПримечание
Поскольку метаданные не могут быть изменены после создания сущности сообщения, для изменения поведения обнаружения повторяющихся сообщений потребуется удалить и снова создать очередь. Тот же принцип применяется к любым другим метаданным.

С точки зрения архитектуры использование очередей является важной частью распределенных приложений, поскольку позволяет привести произвольно изменяющийся трафик к прогнозируемому рабочему потоку и затем распределить нагрузку по рабочим процессам, количество которых может меняться динамически в зависимости от объема входящих сообщений. При наличии конкурирующих клиентов, когда издатель записывает сообщение в очередь, несколько клиентов конкурируют между собой за получение сообщения, но только один из них получит и обработает данное сообщение. Иными словами, очередь может иметь как одного получателя, который получает все сообщения, так и набор конкурирующих получателей, которые выбирают сообщения по принципу «первым получено — первым обработано». По этой причине очереди являются превосходным решением для распределения рабочей нагрузки по нескольким конкурирующим рабочим процессам.

В архитектурах, ориентированных на службы или шины службы, состоящие из нескольких разнородных систем, взаимодействия между автономными системами являются асинхронными и слабосвязанными. В данном контексте для размещения слабосвязанных компонентов пользователи часто прибегают к SOAP или REST, однако использование сущностей сообщений на основе Service Bus (очередей, разделов и т. д. — см. след. раздел) повышает гибкость и масштабируемость архитектуры в целом и позволяет укрепить слабые связи отдельных систем.

Дополнительные сведения об очередях см. в следующих статьях:

Разделы дополняют очереди за счет функций публикации и подписки.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба2

Сущность раздела состоит из последовательного хранилища сообщений (например, очереди), однако раздел поддерживает до 2000 параллельных и устойчивых подписок, которые передают копии сообщения в набор рабочих процессов (это число в будущем может измениться). Как показано на следующем рисунке, каждая подписка может определять одну или несколько сущностей правил.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба3

Каждое правило определяет критерий фильтра, который используется для фильтрации сообщений, проходящих через подписку, и действие фильтра, которое может изменять свойства сообщений. В частности, класс SqlFilter позволяет определять условие вида SQL92 для свойств сообщений:

  • OrderTotal > 5000 OR ClientPriority > 2

  • ShipDestinationCountry = ‘USA’ AND ShipDestinationState = ‘WA’

Дополнительные сведения см. в документации по свойству SqlExpression.

С другой стороны, класс SqlRuleAction может использоваться для изменения, добавления или удаления свойств объекта BrokeredMessage с помощью синтаксиса, подобного тому, который используется в предложении SET инструкции UPDATE.

  • SET AuditRequired = 1

  • SET Priority = 'High', Severity = 1

noteПримечание
Каждое применимое правило, которое явным образом определяет действие, создает отдельную копию опубликованного сообщения, поэтому любая подписка может создать несколько копий одного сообщения, по одному для каждого применимого правила.

Как и очереди, разделы поддерживают сценарий конкурирующих получателей. В этом контексте подписка может иметь либо одного получателя, который получает все сообщения, либо набор конкурирующих получателей, которые выбирают сообщения по принципу «первым получено — первым обработано». Разделы — это превосходное решение по обмену сообщениями, которое позволяет рассылать сообщения нескольким приложениям или распределять рабочую нагрузку между несколькими наборами конкурирующих рабочих процессов.

Дополнительные сведения о разделах см. в следующих статьях:

Класс BrokeredMessage моделирует сообщения, которыми обмениваются приложения, сообщающиеся с помощью очередей и разделов. Класс содержит 4 различных открытых конструктора.

  • Конструктор BrokeredMessage() инициализирует новый экземпляр класса BrokeredMessage с пустыми полезными данными. Поскольку класс BrokeredMessage обеспечивает доступ к коллекции Properties, которая содержит пары «ключ-значение» и позволяет определить набор пользовательских свойств, сообщение часто имеет пустой текст, а полезные данные сообщения передаются через определяемые пользователем свойства.

  • Конструктор BrokeredMessage(Object) инициализирует новый экземпляр класса BrokeredMessage из объекта с помощью метода DataContractSerializer с двоичным модулем XmlDictionaryWriter.

  • Конструктор BrokeredMessage(Stream, Boolean) инициализирует новый экземпляр класса BrokeredMessage с помощью предоставляемого в теле потока.

  • Наконец, конструктор BrokeredMessage(Object, XmlObjectSerializer) инициализирует экземпляр класса BrokeredMessage из указанного объекта с помощью переданного объекта XmlObjectSerializer.

Класс предоставляет доступ к необходимому набору методов, которые дают возможность выполнять различные действия на уровне сообщения.

  • При использовании режима получения PeekLock метод Abandon позволяет снять блокировку, в то время как метод Complete фиксирует операцию получения сообщения и указывает, что сообщение должно быть помечено как обработанное и удалено (заархивировано).

  • Метод Defer указывает, что получатель желает отложить обработку сообщения. Как уже упоминалось, откладывание сообщений — это удобный способ обработки в тех случаях, когда сообщения поступают из ожидаемой последовательности и должны быть отложены, пока приложение ожидает определенного сообщения-сигнала на обработку потока сообщений.

  • Методы DeadLetter и DeadLetter(String, String) позволяют приложению явным образом переместить сообщение из «мертвой» очереди в очередь или подписку. Кроме того, при создании сущности очереди с помощью API управления или портала управления Azure можно настроить автоматическое перемещение сообщений с истекшим сроком действия в «мертвую» очередь. По аналогии можно настроить и подписку таким образом, чтобы сообщения с истекшим сроком действия и сообщения, не прошедшие фильтрацию, помещались в «мертвую» очередь.

Класс BrokeredMessage обладает целым рядом свойств.

  • Свойство ContentType позволяет указать тип содержимого.

  • MessageId — идентификатор сообщения.

  • Свойство CorrelationId можно использовать для реализации шаблона обмена сообщениями вида «запрос-ответ», если клиентское приложение использует свойство MessageId исходящего запроса и свойство CorrelationId входящего ответа для связи двух сообщений. (Подробнее о реализации этой методики см. далее в статье.)

  • Свойство SessionId позволяет задать или получить идентификатор сеанса для сообщения. В реальной конкурентной среде, где множество рабочих процессов получают сообщения из одной очереди на основе сеанса или подписки, сообщения с одинаковым идентификатором SessionId доставляются одному клиенту. В этом контексте, когда клиентское приложение А посылает поток запросов серверному приложению Б через очередь или раздел на основе сеанса и ожидает соответствующих ответов в очереди или подписке, клиентское приложение А может присвоить идентификатор сеанса получения свойству ReplyToSessionId для исходящих сообщений, чтобы приложению Б было известно значение, которое необходимо присвоить свойству SessionId для ответных сообщений.

  • Свойство ReplyTo возвращает или задает адрес очереди, куда отправляется ответ. В асинхронном сценарии вида «запрос-ответ», где клиентское приложение А отправляет запрос серверному приложению Б через очередь или раздел на основе Service Bus и ожидает ответного сообщения, клиентское приложение А может по умолчанию в свойстве ReplyTo исходящего сообщения указать серверному приложению Б адрес очереди или раздела для отправки ответа. (Подробнее о применении этой методики см. далее в статье.) Свойство Label возвращает или задает метку, которая зависит от приложения и может понадобиться пользователю.

  • Свойство SequenceNumber возвращает уникальный номер, назначенный сообщению со стороны Service Bus. Это свойство может использоваться для получения отложенного сообщения из очереди или подписки.

  • Свойство TimeToLive позволяет задать или узнать срок жизни сообщения. Service Bus не определяет максимальное время существования сообщений, ожидающих обработки в очереди или подписке. Но можно определить срок по умолчанию для сообщений в момент создания очереди, раздела или подписки либо определить явным образом время ожидания на уровне сообщения с помощью свойства TimeToLive.

  • DeliveryCount возвращает число доставок сообщения.

  • Коллекция Properties позволяет определить свойства сообщения в зависимости от приложения. Возможно, это самая важная из доступных пользователю функций сущности BrokeredMessage, которая может быть использована следующим образом.

    • Передавать полезную нагрузку сообщения. В этом контексте сообщение может быть пустым.

    • Определять свойства, зависящие от приложения, с помощью которых рабочий процесс может решать, как обрабатывать текущее сообщение.

    • Указывать выражение фильтра и действие, с помощью которых можно задавать маршрут и правила на уровне подписки.

Если сравнивать с контекстом свойств сообщения BizTalk, то можно представить определяемые пользователем свойства, которые содержатся в коллекции BrokeredMessage Properties, в виде контекстных свойств сообщения BizTalk. Еще один пример контейнера свойств, используемого для передачи контекстной информации, обеспечивается WCF через специальный набор контекстных привязок наподобие BasicHttpContextBinding, NetTcpContextBinding или WSHttpContextBinding, которые позволяют отправлять дополнительные параметры службе для обмена контекстом в заголовках HttpCookies или SOAP. Часто коллекция свойств BrokeredMessage используется для передачи части информации или даже всех полезных данных, а при использовании разделов и подписок — для перенаправления сообщения нужному адресату. Следовательно, если сторонняя система обменивается сообщениями с приложением BizTalk через Service Bus, очень важным является преобразование определяемых приложением свойств, передаваемых в объекте BrokeredMessage, в контекстные свойства сообщения BizTalk и наоборот. О том, как достичь такого результата, будет рассказано в этой статье и сопроводительном коде.

noteПримечание
Как указано в разделе Квоты шины обслуживания Azure AppFabric, максимальный размер каждого свойства равен 32K. Общий размер всех свойств не может превышать 64К. Это относится ко всему заголовку BrokeredMessage, который содержит как пользовательские, так и системные свойства (например, SequenceNumber,Label, MessageId и т. д.). Пространство, занимаемое свойствами, учитывается в общем размере сообщения, максимальный размер которого равен 256К. Если приложение превышает указанные выше лимиты, возникнет исключение SerializationException, поэтому следует предусмотреть обработку этой ошибки.

Обмен сообщениями на основе Service Bus поддерживает модель программирования WCF и, в частности, обеспечивает новую привязку, которая называется NetMessagingBinding и которую приложения, реализующие WCF, могут использовать для отправки и получения сообщений через очереди, разделы и подписки по протоколу обмена сообщениями на основе Service BUS (SBMP). NetMessagingBinding — новое имя для привязки для очередей и разделов, которая обеспечивает полную интеграцию с WCF. С функциональной точки зрения NetMessagingBinding можно сравнить с NetMsmqBinding, которая обеспечивает поддержку очередей за счет службы очередей сообщений (MSMQ), используемой в качестве транспорта, и слабосвязанных приложений. Со стороны службы NetMessagingBinding обеспечивает автоматическое наполнение сообщениями, в результате чего сообщения выходят из очереди или подписки, при этом алгоритм интегрирован в механизм ReceiveContext в составе WCF.

Новая привязка поддерживает стандартные интерфейсы IInputChannel, IOutputChannel и IInputSessionChannel. Если приложение использует WCF и NetMessagingBinding для отправки сообщения в очередь или раздел, то сообщение вкладывается в конверт SOAP и шифруется. Чтобы задать определенные свойства BrokeredMessage, следует создать объект BrokeredMessageProperty, определить его свойства и добавить его в коллекцию Properties в составе WCF Message, как показано в следующей таблице. При использовании NetMessagingBinding для вставки сообщения в очередь или раздел внутренний класс ServiceBusOutputChannel ищет свойство BrokeredMessageProperty в коллекции Properties сообщения WCF и копирует все его свойства в создаваемый объект BrokeredMessage. После этого полезные данные из сообщения WCF копируются в объект BrokeredMessage. Результирующее сообщение публикуется в целевую очередь или раздел.

static void Main(string[] args)
{
    try
    {
        // Create the 
        var channelFactory = new ChannelFactory<IOrderService>("orderEndpoint");
        var clientChannel = channelFactory.CreateChannel();
        
        // Create a order object
        var order = new Order()
                        {
                            ItemId = "001",
                            Quantity = 10
                        };

        // Use the OperationContextScope to create a block within which to access the current OperationScope
        using (var scope = new OperationContextScope((IContextChannel)clientChannel))
        {
            // Create a new BrokeredMessageProperty object
            var property = new BrokeredMessageProperty();

            // Use the BrokeredMessageProperty object to set the BrokeredMessage properties
            property.Label = "OrderItem";
            property.MessageId = Guid.NewGuid().ToString();
            property.ReplyTo = "sb://acme.servicebus.windows.net/invoicequeue";

            // Use the BrokeredMessageProperty object to define application-specific properties
            property.Properties.Add("ShipCountry", "Italy");
            property.Properties.Add("ShipCity", "Milan");

            // Add BrokeredMessageProperty to the OutgoingMessageProperties bag provided 
            // by the current Operation Context 
            OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, property);
            clientChannel.SendOrder(order);
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Кроме того, если для получения сообщений из очереди или раздела используется конечная точка службы на основе NetMessagingBinding, то приложение может извлечь объект BrokeredMessageProperty из коллекции Properties входящего сообщения WCF, как показано в следующей таблице. В частности, после получения сообщения внутренние классы ServiceBusInputChannel и ServiceBusInputSessionChannel (второй класс служит для получения сообщений из очередей и подписок на основе сеанса) создается новый объект WCF Message, а затем полезные данные из текста входящего сообщения на основе BrokeredMessage копируются в текст только что созданного сообщения WCF. Затем свойства входящего сообщения на основе BrokeredMessage копируются в новый экземпляр класса BrokeredMessageProperty, и, наконец, результирующее сообщение добавляется в коллекцию Properties входящего сообщения WCF.

[ServiceBehavior]
public class OrderService : IOrderService
{
    [OperationBehavior]
    public void ReceiveOrder(Order order)
    {
        // Get the BrokeredMessageProperty from the current OperationContext
        var incomingProperties = OperationContext.Current.IncomingMessageProperties;
        var property = incomingProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;

        ...
    }
}

Поскольку Service Bus не поддерживает IOutputSessionChannel, все приложения, отправляющие сообщения в очереди на основе сеанса, должны использовать контракт службы, для которого свойство SessionMode отличается от SessionMode.Required. Тем не менее среда исполнения Service Bus WCF поддерживает интерфейс IInputSessionChannel, поэтому для получения сообщений из очереди или подписки на основе сеанса с помощью WCF и NetMessagingBinding приложение должно реализовывать контракт службы на основе сеанса. Следующий фрагмент кода — пример того, как служба WCF получает сообщения из очереди или подписки на основе сеанса.

// ServiceBus does not support IOutputSessionChannel.
// All senders sending messages to sessionful queue must use a contract which does not enforce SessionMode.Required.
// Sessionful messages are sent by setting the SessionId property of the BrokeredMessageProperty object.
[ServiceContract]
public interface IOrderService
{
    [OperationContract(IsOneWay = true)]
    [ReceiveContextEnabled(ManualControl = true)]
    void ReceiveOrder(Order order);
}

// ServiceBus supports both IInputChannel and IInputSessionChannel. 
// A sessionful service listening to a sessionful queue must have SessionMode.Required in its contract.
[ServiceContract(SessionMode = SessionMode.Required)]
public interface IOrderServiceSessionful : IOrderService
{
}

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession, ConcurrencyMode = ConcurrencyMode.Single)]
public class OrderService : IOrderServiceSessionful
{
    [OperationBehavior]
    public void ReceiveOrder(Order order)
    {
        // Get the BrokeredMessageProperty from the current OperationContext
        var incomingProperties = OperationContext.Current.IncomingMessageProperties;
        var property = incomingProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;

        ...

        //Complete the Message
        ReceiveContext receiveContext;
        if (ReceiveContext.TryGet(incomingProperties, out receiveContext))
        {
            receiveContext.Complete(TimeSpan.FromSeconds(10.0d));
            ...
        }
        else
        {
            throw new InvalidOperationException("...");
        }
    }
}

Обратите внимание, что свойство ManualControl атрибута операции ReceiveContextEnabled имеет значение true. Для этого служба должна явным образом вызывать метод ReceiveContext.Complete и зафиксировать операцию получения. На самом деле, если свойство ManualControl имеет значение true, сообщение, полученное из канала, доставляется операции службы с блокировкой. Приложение службы должно вызвать либо метод Complete(TimeSpan), либо Abandon(TimeSpan), чтобы просигнализировать о завершении получения сообщения. Если ни один из этих методов не вызван, то блокировка сообщения будет сохраняться до тех пор, пока не истечет время ожидания. После снятия блокировки — либо вызовом метода Abandon(TimeSpan), либо по истечении времени ожидания — сообщение будет отправлено повторно из канала в службу. После вызова метода Complete(TimeSpan) сообщение будет отмечено как успешно полученное.

Обратите внимание, что у класса OrderService свойство ServiceBehavior.InstanceContextMode имеет значение InstanceContextMode.PerSession, а свойство ConcurrencyMode — значение ConcurrencyMode.Single. В результате ServiceHost будет создавать новый экземпляр службы каждый раз, когда будет доступен новый сеанс в соответствующей очереди или подписке, и использовать один поток для получения сообщений последовательным образом. Время существования экземпляра службы управляется свойством SessionIdleTimeout привязки NetMessagingBinding.

На следующем рисунке показана примерная архитектура высокого уровня.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба4

  1. Клиентское приложение использует прокси-сервер WCF и привязку NetMessagingBinding для отправки запроса в requestqueue или requesttopic.

  2. Служба WCF Workflow Service в консольном приложении или службы IIS получают сообщение-запрос от подписки requestqueue или ItalyMilan, определенных в requesttopic.

    ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба5
  3. Служба WCF Workflow Service, показанная на приведенном выше рисунке, выполняет следующие действия.

    • Пользовательское действие BrokeredMessagePropertyActivity (с отображаемым именемGet BrokeredMessage) считывает свойство BrokeredMessageProperty из исходящего сообщения и присваивает его значение переменной рабочего процесса, определенной во внешнем действии Sequential.

    • Действие Receive извлекает сообщение из подписки requestqueue или ItalyMilan раздела requesttopic.

    • Пользовательское действие CalculatorActivity получает входящее сообщение и BrokeredMessageProperty в качестве входящих аргументов, обрабатывает сообщение-запрос и формирует сообщение-ответ и исходящее свойство BrokeredMessageProperty. При запуске в консольном приложении действие отслеживает свойства входящего и исходящего свойств BrokeredMessageProperty в стандартном устройстве вывода.

    • Действие If считывает адрес из свойства ReplyTo входящего свойства BrokeredMessageProperty.

      • Если строка содержит слово «topic», то ответ отправляется в responsetopic.

      • В противном случае ответ отправляется в responsequeue.

    • В обоих случаях экземпляр BrokeredMessagePropertyActivity (с отображаемым именем Set BrokeredMessage) используется для обертывания действия отправки и присвоения исходящего свойства BrokeredMessageProperty свойствам коллекции сообщения-ответа WCF.

  4. Служба WCF Workflow Service записывает сообщение ответа в очередь responsequeue или раздел responsetopic.

  5. Клиентское приложение использует службу WCF с двумя разными конечными точками для получения сообщения-ответа из очереди responsequeue или раздела responsetopic. В среде с несколькими клиентскими приложениями каждое приложение должно использовать отдельную очередь или подписку для получения сообщений-ответов от BizTalk. Подробнее об этом см. далее в статье.

После рассмотрения общих сведений о примерной архитектуре можно приступить к подробному анализу основных компонентов решения.

Первая операция, которую необходимо выполнить, чтобы правильно настроить среду, — это создать сущности обмена сообщениями, используемые демонстрационной версией. Первая выполняемая операция заключается в предоставлении нового пространства имен Service Bus или изменении существующего пространства имен с добавлением Service Bus. Эту задачу можно выполнить с помощью Портала управления Windows Azure, нажав кнопку Создать или Изменить, соответственно.

Далее необходимо создать сущности очереди, подписки и раздела, необходимые демонстрационной версии. Как было упомянуто во введении, эту задачу можно выполнить разными способами. Проще всего это сделать с помощью портала управления Azure, воспользовавшись кнопками команды Управление сущностями.

В стилизованном дереве навигации, показанном на шаге 2, выберите сущность и выведите ее свойства на вертикальной панели, отмеченной в точке 3. Чтобы удалить выбранную сущность, нужно нажать кнопку Удалить на панели команд Управление сущностями.

noteПримечание
Портал управления Azure позволяет с удобством управлять сущностями сообщений в определенном пространстве имен шины обслуживания. Однако по крайней мере на данный момент набор операций, которые разработчик или системный администратор может использовать в пользовательском интерфейсе, очень ограниченный. Например, портал управления Azure фактически позволяет пользователю создавать очереди, разделы и подписки и определять их свойства, но не дает создавать или отображать правила для существующей подписки. На данный момент эту задачу можно выполнить только с помощью API-интерфейса .NET для обмена сообщениями. В частности, чтобы добавить новое правило в существующую подписку, можно вызвать метод AddRule(String, Filter) или AddRule(RuleDescription), обеспечиваемые классом SubscriptionClient, а чтобы пронумеровать правила существующей подписки, можно воспользоваться методом GetRules класса NamespaceManager. Кроме того, портал управления Azure фактически не дает возможности выполнять следующие операции.

  1. Правильно отображать сущности в виде иерархии. Фактически портал управления Azure отображает очереди, разделы и подписки в плоском древовидном представлении. Однако можно организовать сущности сообщений в иерархическую структуру путем указания имени в виде абсолютного пути, состоящего из нескольких сегментов, например crm/prod/queues/orderqueue.

  2. Экспортировать сущности сообщений из пространства имен шины обслуживания в XML-файл привязки (как на BizTalk Server). Вместо этого обозреватель шины обслуживания позволяет просто выбирать и экспортировать

    1. Отдельные сущности.

    2. Сущности по типу (очереди или разделы).

    3. Сущности по определенному пути (например, crm/prod/queues).

    4. Все сущности определенного пространства имен.

  3. Импортировать очереди, разделы и подписки из XML-файла в существующее пространство имен. Обозреватель шины обслуживания поддерживает возможность экспорта сущностей в XML-файл и повторного импорта в то же или другое пространство имен шины обслуживания. Эта функция полезна для резервного копирования и восстановления пространства имен или просто для переноса очередей и разделов из тестового пространства имен в рабочее.

Обозреватель Service Bus позволяет пользователю создавать, удалять и тестировать очереди, разделы, подписки и правила и является хорошим дополнением к порталу управления Azure.

Кроме того, можно воспользоваться консольным приложением, которое называется Provisioning и которое использует функции класса NamespaceManager для создания очередей, разделов и подписок, необходимых решению. При запуске консольного приложения появится запрос учетных данных пространства имен службы. Учетные данные необходимы для проверки подлинности в службе управления доступом, при этом они получают токен доступа, который предъявляет приложение инфраструктуре Service Bus в подтверждение разрешения на провизионирование новых сущностей обмена сообщениями. После этого приложение запрашивает значение, которое необходимо присваивать создаваемым сущностям, например EnabledBatchedOperations и EnableDeadLetteringOnMessageExpiration, для очередей. Приложение Provisioning создает следующие сущности в указанном пространстве имен Service Bus:

  1. Очередь с именем requestqueue, используемая клиентским приложением для отправки сообщений-запросов на сервер BizTalk.

  2. Очередь с именем responsequeue, используемая сервером BizTalk для отправки сообщений-ответов клиентскому приложению.

  3. Раздел с именем requesttopic, используемый клиентским приложением для отправки сообщений-запросов на сервер BizTalk.

  4. Раздел с именем responsetopic, используемый сервером BizTalk для отправки сообщений-ответов клиентскому приложению.

  5. Подписка с именем ItalyMilan для раздела requesttopic. Эта подписка используется сервером BizTalk для получения сообщений-запросов из раздела requesttopic. Подписка имеет единственное правило, определенное следующим образом:

    1. Фильтр. Country='Italy' and City='Milan'

    2. Действие. Set Area='Western Europe'

  6. Подписка с именем ItalyMilan для раздела responsetopic. Подписка используется клиентским приложением для получения сообщений-ответов из раздела responsetopic. Подписка имеет единственное правило, определенное следующим образом:

    1. Фильтр. Country='Italy' and City='Milan'

    2. Действие. Set Area='Western Europe'

На следующем рисунке приводится вывод консольного приложения Provisioning.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба8

В следующей таблице приводится код приложения Provisioning.

#region Using Directives
using System;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Provisioning
{
    static class Program
    {
        #region Private Constants
        //***************************
        // Constants
        //***************************

        private const string RequestQueue = "requestqueue";
        private const string ResponseQueue = "responsequeue";
        private const string RequestTopic = "requesttopic";
        private const string ResponseTopic = "responsetopic";
        private const string RequestSubscription = "ItalyMilan";
        private const string ResponseSubscription = "ItalyMilan";
        #endregion
        static void Main()
        {
            var defaultColor = Console.ForegroundColor;

            try
            {
                // Set Window Size
                Console.WindowWidth = 100;
                Console.BufferWidth = 100;
                Console.WindowHeight = 48;
                Console.BufferHeight = 48;

                // Print Header            
                Console.WriteLine("Read Credentials:");
                Console.WriteLine("-----------------");

                // Read Service Bus Namespace
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Namespace: ");
                Console.ForegroundColor = defaultColor;
                var serviceNamespace = Console.ReadLine();

                // Read Service Bus Issuer Name
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Issuer Name: ");
                Console.ForegroundColor = defaultColor;
                var issuerName = Console.ReadLine();

                // Read Service Bus Issuer Secret
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Issuer Secret: ");
                Console.ForegroundColor = defaultColor;
                var issuerSecret = Console.ReadLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Queues Properties:");
                Console.WriteLine("------------------------");

                // Read Queue EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                var key = Console.ReadKey().KeyChar;
                var queueEnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue EnableDeadLetteringOnMessageExpiration
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnMessageExpiration ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueEnableDeadLetteringOnMessageExpiration = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue RequiresDuplicateDetection
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresDuplicateDetection ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueRequiresDuplicateDetection = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue RequiresSession
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresSession ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueRequiresSession = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Topic Properties:");
                Console.WriteLine("-----------------------");

                // Read Topic EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Topics: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var topicEnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Topic RequiresDuplicateDetection
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Topics: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresDuplicateDetection ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var topicRequiresDuplicateDetection = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Subscriptions Properties: ");
                Console.WriteLine("-------------------------------");

                // Read Subscription EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription EnableDeadLetteringOnFilterEvaluationExceptions
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnFilterEvaluationExceptions ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionEnableDeadLetteringOnFilterEvaluationExceptions = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription EnableDeadLetteringOnMessageExpiration
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnMessageExpiration ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionEnableDeadLetteringOnMessageExpiration = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription RequiresSession
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresSession ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionRequiresSession = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Get ServiceBusNamespaceClient for management operations
                var managementUri = 
                    ServiceBusEnvironment.CreateServiceUri("https", serviceNamespace, string.Empty);
                var tokenProvider = 
                    TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerSecret);
                var namespaceManager = new NamespaceManager(managementUri, tokenProvider);

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Queues:");
                Console.WriteLine("--------------");

                // Create RequestQueue
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue...");
                if (namespaceManager.QueueExists(RequestQueue))
                {
                    namespaceManager.DeleteQueue(RequestQueue);
                }
                namespaceManager.CreateQueue(new QueueDescription(RequestQueue)
                                {
                                    EnableBatchedOperations = queueEnabledBatchedOperations,
                                    EnableDeadLetteringOnMessageExpiration =
                                        queueEnableDeadLetteringOnMessageExpiration,
                                    RequiresDuplicateDetection = queueRequiresDuplicateDetection,
                                    RequiresSession = queueRequiresSession
                                });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue successfully created.");

                // Create ResponseQueue
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue...");
                if (namespaceManager.QueueExists(ResponseQueue))
                {
                    namespaceManager.DeleteQueue(ResponseQueue);
                }
                namespaceManager.CreateQueue(new QueueDescription(ResponseQueue)
                            {
                                EnableBatchedOperations = queueEnabledBatchedOperations,
                                EnableDeadLetteringOnMessageExpiration = 
                                    queueEnableDeadLetteringOnMessageExpiration,
                                RequiresDuplicateDetection = queueRequiresDuplicateDetection,
                                RequiresSession = queueRequiresSession
                            });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue successfully created.");

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Topics:");
                Console.WriteLine("--------------");

                // Create RequestTopic
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                if (namespaceManager.TopicExists(RequestTopic))
                {
                    namespaceManager.DeleteTopic(RequestTopic);
                }
                namespaceManager.CreateTopic(new TopicDescription(RequestTopic)
                                        {
                                            EnableBatchedOperations = topicEnabledBatchedOperations,
                                            RequiresDuplicateDetection = topicRequiresDuplicateDetection
                                        });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Create ResponseTopic
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                if (namespaceManager.TopicExists(ResponseTopic))
                {
                    namespaceManager.DeleteTopic(ResponseTopic);
                }
                namespaceManager.CreateTopic(new TopicDescription(ResponseTopic)
                                    {
                                        EnableBatchedOperations = topicEnabledBatchedOperations,
                                        RequiresDuplicateDetection = topicRequiresDuplicateDetection
                                    });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Subscriptions:");
                Console.WriteLine("--------------");

                // Create Request Subscription
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                var ruleDescription = new RuleDescription(new SqlFilter("Country='Italy' and City='Milan'"))
                                          {
                                              Name = "$Default",
                                              Action = new SqlRuleAction("Set Area='Western Europe'")
                                          };
                var subscriptionDescription = new SubscriptionDescription(RequestTopic, RequestSubscription)
                {
                    EnableBatchedOperations = subscriptionnabledBatchedOperations,
                    EnableDeadLetteringOnFilterEvaluationExceptions = 
                            subscriptionEnableDeadLetteringOnFilterEvaluationExceptions,
                    EnableDeadLetteringOnMessageExpiration = 
                            subscriptionEnableDeadLetteringOnMessageExpiration,
                    RequiresSession = subscriptionRequiresSession
                };
                namespaceManager.CreateSubscription(subscriptionDescription, ruleDescription);
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Create Response Subscription
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                ruleDescription = new RuleDescription(new SqlFilter("Country='Italy' and City='Milan'"))
                                      {
                                          Action = new SqlRuleAction("Set Area='Western Europe'")
                                      };
                subscriptionDescription = new SubscriptionDescription(ResponseTopic, ResponseSubscription)
                {
                    EnableBatchedOperations = subscriptionnabledBatchedOperations,
                    EnableDeadLetteringOnFilterEvaluationExceptions = 
                            subscriptionEnableDeadLetteringOnFilterEvaluationExceptions,
                    EnableDeadLetteringOnMessageExpiration = 
                            subscriptionEnableDeadLetteringOnMessageExpiration,
                    RequiresSession = subscriptionRequiresSession
                };
                namespaceManager.CreateSubscription(subscriptionDescription, ruleDescription);
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");
                Console.WriteLine();

                // Close the application
                Console.WriteLine("Press any key to continue ...");
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("Exception: ");
                Console.ForegroundColor = defaultColor;
                Console.Write(ex.Message);
            }
        }
    }
}

noteПримечание
В демонстрационной версии не были протестированы все возможные сочетания свойств для очередей и разделов, так что демонстрационная версия может не работать нужным образом для всех конфигураций.

Частичное определение контрактов данных для сообщений-запросов и сообщений-ответов было дано выше. Контракты данных обеспечивают механизм для типов CLR .NET, которые задаются в коде и XML-схемах (XSD), определенных организацией W3C (www.w3c.org). Контракты данных публикуются в метаданных службы, позволяя клиентам преобразовывать нейтральное, независимое от технологий представление типов данных в подходящие представления. Дополнительные сведения о данных и контрактах см. в следующих статьях:

В предлагаемом решении создан проект DataContracts, в котором определены классы, определяющие сообщения-запросы и сообщения-ответы. Соответствующий код приводится ниже.

Класс CalculatorRequest

[Serializable]
[XmlType(TypeName = "CalculatorRequest", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[XmlRoot(ElementName = "CalculatorRequest", 
         Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
         IsNullable = false)]
[DataContract(Name = "CalculatorRequest", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class CalculatorRequest
{
    #region Private Fields
    private OperationList operationList;
    #endregion

    #region Public Constructors
    public CalculatorRequest()
    {
        operationList = new OperationList();
    }

    public CalculatorRequest(OperationList operationList)
    {
        this.operationList = operationList;
    }
    #endregion

    #region Public Properties
    [XmlArrayItem("Operation", Type=typeof(Operation), IsNullable = false)]
    [DataMember(Order = 1)]
    public OperationList Operations
    {
        get
        {
            return operationList;
        }
        set
        {
            operationList = value;
        }
    } 
    #endregion
}

[CollectionDataContract(Name = "OperationList", 
                        Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
                        ItemName = "Operation")]
public class OperationList : List<Operation>
{
}

[Serializable]
[XmlType(TypeName = "Operation", 
         AnonymousType = true, 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[DataContract(Name = "Operation", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class Operation
{
    #region Private Fields
    private string op;
    private double operand1;
    private double operand2;
    #endregion

    #region Public Constructors
    public Operation()
    {
    }

    public Operation(string op,
                        double operand1,
                        double operand2)
    {
        this.op = op;
        this.operand1 = operand1;
        this.operand2 = operand2;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public string Operator
    {
        get
        {
            return op;
        }
        set
        {
            op = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 2)]
    public double Operand1
    {
        get
        {
            return operand1;
        }
        set
        {
            operand1 = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 3)]
    public double Operand2
    {
        get
        {
            return operand2;
        }
        set
        {
            operand2 = value;
        }
    } 
    #endregion
}

Класс CalculatorResponse

[Serializable]
[XmlType(TypeName = "CalculatorResponse", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[XmlRoot(ElementName = "CalculatorResponse", 
         Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
         IsNullable = false)]
[DataContract(Name = "CalculatorResponse", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class CalculatorResponse
{
    #region Private Fields
    private string status;
    private ResultList resultList;
    #endregion

    #region Public Constructors
    public CalculatorResponse()
    {
        status = default(string);
        resultList = new ResultList();
    }

    public CalculatorResponse(string status)
    {
        this.status = status;
        resultList = new ResultList();
    }

    public CalculatorResponse(string status, ResultList resultList)
    {
        this.status = status;
        this.resultList = resultList;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public string Status 
    {
        get 
        {
            return status;
        }
        set 
        {
            status = value;
        }
    }

    [XmlArrayItem("Result", Type=typeof(Result), IsNullable=false)]
    [DataMember(Order = 2)]
    public ResultList Results 
    {
        get 
        {
            return resultList;
        }
        set 
        {
            resultList = value;
        }
    }
    #endregion
}

[CollectionDataContract(Name = "ResultList", 
                        Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
                        ItemName = "Result")]
public class ResultList : List<Result>
{
}

[Serializable]
[XmlType(TypeName = "Result", 
         AnonymousType = true, 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[DataContract(Name = "Result", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class Result
{
    #region Private Fields
    private double value;
    private string error; 
    #endregion

    #region Public Constructors
    public Result()
    {
        value = default(double);
        error = default(string);
    }

    public Result(double value, string error)
    {
        this.value = value;
        this.error = error;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public double Value
    {
        get
        {
            return value;
        }
        set
        {
            this.value = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 2)]
    public string Error
    {
        get
        {
            return error;
        }
        set
        {
            error = value;
        }
    } 
    #endregion
}

Следующий шаг заключается в определении Контрактов службы, используемых клиентским приложением для обмена сообщениями с Service Bus. С этой целью я создал в своем решении новый проект под названием ServiceContracts и определил два интерфейса контракта службы, используемых клиентским приложением, для отправки и приема сообщений (соответственно) от сущностей обмена сообщениями Service Bus. На самом деле были созданы две разные версии контракта службы, используемого для получения сообщений-ответов.

  • Интерфейс ICalculatorResponse предназначен для получения не основанных на сеансе сообщений-ответов из очереди или подписки.

  • Интерфейс ICalculatorResponseSessionful наследует от контракта службы ICalculatorResponse и отмечен атрибутом ServiceContract(SessionMode = SessionMode.Required)]. Этот контракт службы предназначен для получения не основанных на сеансе сообщений-ответов из очереди или подписки.

Обратите внимание, что методы, определенные любыми контрактами, являются односторонними. Это условие является обязательным.

Интерфейсы ICalculatorRequest, ICalculatorResponse, ICalculatorResponseSessionful

#region Using Directives
using System.ServiceModel;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBus.MessageContracts;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBus.ServiceContracts
{
    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus",
                     ConfigurationName = "ICalculatorRequest", 
                     SessionMode = SessionMode.Allowed)]
    public interface ICalculatorRequest
    {
        [OperationContract(Action = "SendRequest", IsOneWay = true)]
        [ReceiveContextEnabled(ManualControl = true)]
        void SendRequest(CalculatorRequestMessage calculatorRequestMessage);
    }

    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus", 
                     ConfigurationName = "ICalculatorResponse",
                     SessionMode = SessionMode.Allowed)]
    public interface ICalculatorResponse
    {
        [OperationContract(Action = "ReceiveResponse", IsOneWay = true)]
        [ReceiveContextEnabled(ManualControl = true)]
        void ReceiveResponse(CalculatorResponseMessage calculatorResponseMessage);
    }

    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus", 
                     ConfigurationName = "ICalculatorResponse"
                     SessionMode = SessionMode.Required)]
    public interface ICalculatorResponseSessionful : ICalculatorResponse
    {
    }
}

Теперь можно взглянуть на код клиентского приложения.

Поскольку приложение Windows Forms обменивается сообщениями с базовой службой рабочих процессов WCF асинхронно через сущности обмена сообщениями на основе Service Bus, приложение работает от имени клиента и службы одновременно. Приложение Windows Forms использует WCF и NetMessagingBinding для выполнения следующих действий.

  1. Отправка сообщений-запросов в очередь requestqueue.

  2. Отправка сообщений-запросов в раздел requesttopic.

  3. Получение сообщений-ответов из очереди responsequeue.

  4. Получение сообщений-ответов из подписки ItalyMilan раздела responsetopic.

Рассмотрим файл конфигурации клиентского приложения, который играет центральную роль в определении конечных точек клиента и службы WCF, используемых для сообщения с шиной Service Bus.

Файл App.Config клиентского приложения


   1:  <?xml version="1.0"?>
   2:  <configuration>
   3:    <system.diagnostics>
   4:      <sources>
   5:        <source name="System.ServiceModel.MessageLogging"  
                     switchValue="Warning, ActivityTracing">
   6:          <listeners>
   7:            <add type="System.Diagnostics.DefaultTraceListener" 
                      name="Default">
   8:              <filter type="" />
   9:            </add>
  10:            <add name="ServiceModelMessageLoggingListener">
  11:              <filter type="" />
  12:            </add>
  13:          </listeners>
  14:        </source>
  15:      </sources>
  16:      <sharedListeners>
  17:        <add initializeData="C:\ServiceBusWFClient.svclog"
  18:             type="System.Diagnostics.XmlWriterTraceListener, System, 
  19:                   Version=2.0.0.0, Culture=neutral,
                        PublicKeyToken=b77a5c561934e089"
  20:             name="ServiceModelMessageLoggingListener"
  21:             traceOutputOptions="Timestamp">
  22:          <filter type="" />
  23:        </add>
  24:      </sharedListeners>
  25:      <trace autoflush="true" indentsize="4">
  26:        <listeners>
  27:          <clear/>
  28:          <add name="LogTraceListener"
  29:               type="Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.Client.LogTraceListener, Client"
  30:               initializeData="" />
  31:        </listeners>
  32:      </trace>
  33:    </system.diagnostics>
  34:    <startup>
  35:     <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
  36:    </startup>
  37:    <system.serviceModel>
  38:      <diagnostics>
  39:        <messageLogging logEntireMessage="true"
  40:                        logMalformedMessages="false"
  41:                        logMessagesAtServiceLevel="true"
  42:                        logMessagesAtTransportLevel="false" />
  43:      </diagnostics>
  44:      <behaviors>
  45:        <endpointBehaviors>
  46:          <behavior name="securityBehavior">
  47:            <transportClientEndpointBehavior>
  48:              <tokenProvider>
  49:                <sharedSecret issuerName="owner"
  50:                              issuerSecret="SHARED-SECRET" />
  51:              </tokenProvider>
  52:            </transportClientEndpointBehavior>
  53:          </behavior>
  54:        </endpointBehaviors>
  55:      </behaviors>
  56:      <bindings>
  57:        <netMessagingBinding>
  58:          <binding name="netMessagingBinding"
  59:                   sendTimeout="00:03:00"
  60:                   receiveTimeout="00:03:00"
  61:                   openTimeout="00:03:00"
  62:                   closeTimeout="00:03:00"
  63:                   sessionIdleTimeout="00:01:00"
  64:                   prefetchCount="-1">
  65:            <transportSettings batchFlushInterval="00:00:01" />
  66:          </binding>
  67:        </netMessagingBinding>
  68:      </bindings>
  69:      <client>
  70:        <!-- Invoke WF Service via Service Bus Queue -->
  71: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requestqueue"
  72:                  behaviorConfiguration="securityBehavior" 
  73:                  binding="netMessagingBinding"
  74:                  bindingConfiguration="netMessagingBinding" 
  75:                  contract="ICalculatorRequest"
  76:                  name="requestQueueClientEndpoint" />
  77:        <!-- Invoke WF Service via Service Bus Topic -->
  78: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requesttopic"
  79:                  behaviorConfiguration="securityBehavior"
  80:                  binding="netMessagingBinding"
  81:                  bindingConfiguration="netMessagingBinding"
  82:                  contract="ICalculatorRequest"
  83:                  name="requestTopicClientEndpoint" />
  84:      </client>
  85:      <services>
  86:        <service name="ResponseHandlerService">
  87: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsequeue"
  88:                    behaviorConfiguration="securityBehavior"
  89:                    binding="netMessagingBinding"
  90:                    bindingConfiguration="netMessagingBinding"
  91:                    name="responseQueueServiceEndpoint"
  92:                    contract="ICalculatorResponse" />
  93: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic"
  94:                    listenUri="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
  95:                    behaviorConfiguration="securityBehavior"
  96:                    binding="netMessagingBinding"
  97:                    bindingConfiguration="netMessagingBinding"
  98:                    name="responseSubscriptionServiceEndpoint"
  99:                    contract="ICalculatorResponse" />
 100:        </service>
 101:      </services>
 102:    </system.serviceModel>
 103:  </configuration>

Ниже приведено краткое описание основных элементов и разделов файла конфигурации.

  • В строках [3–32] дается определение пользовательского прослушивателя трассировки LogTraceListener, который используется ResponseHandlerService для записи сообщений-ответов в журнал управления приложением Windows Forms.

  • В строках [34–36] раздел запуска указывает, какие версии среды CLR поддерживает приложение.

  • Строки [46–53] содержат определение класса securityBehavior, который используется конечной точкой клиента и службы для проверки подлинности на стороне службы управления доступом. В частности, метод TransportClientEndpointBehavior используется для определения общих закрытых учетных данных. Дополнительные сведения о том, как получить учетные данные на портале управления Azure, см. ниже.

  • Строки [57-67] относятся к конфигурации привязки NetMessagingBinding, которая используется конечными точками клиента и службы для обмена сообщениями с шиной Service Bus.

  • Строки [71–76] содержат определение конечной точки requestQueueClientEndpoint, которая используется приложением для отправки сообщений-запросов в очередь requestqueue. Параметр address клиентской конечной точки объединяет URL-адрес пространства имен службы и имя очереди.

  • Строки [78–83] содержат определение конечной точки requestTopicClientEndpoint, используемой приложением для отправки сообщений-запросов в раздел requesttopic. Параметр address клиентской конечной точки объединяет URL-адрес пространства имен службы и имя раздела.

  • Строки [87–92] содержат определение конечной точки responseQueueServiceEndpoint, которая используется приложением для получения сообщений-ответов из очереди responsequeue. Параметр address конечной точки службы объединяет URL-адрес пространства имен службы и имя очереди.

  • Строки [93–99] содержат определение конечной точки responseSubscriptionServiceEndpoint, которая используется приложением для получения сообщений-ответов из подписки ItalyMilan для раздела responsetopic. При определении конечной точки службы WCF, использующей привязку NetMessagingBinding для получения сообщений из подписки, необходимо выполнить следующие шаги (дополнительные сведения см. в поле ниже).

    • В качестве значения для атрибута address следует указать URL-адрес раздела, к которому относится подписка. URL-адрес раздела объединяет URL-адрес пространства имен службы и имя раздела.

    • В качестве значения атрибута listenUri следует указать URL-адрес подписки. URL-адрес подписки объединяет URL-адрес раздела, строку /Subscriptions/ и имя подписки.

    • Присвойте значение Explicit атрибуту listenUriMode. Значение по умолчанию для свойства listenUriModeExplicit, поэтому это действие необязательно.

noteПримечание
При настройке конечной точки службы WCF для получения сообщений из очереди или подписки на основе сеанса контракту службы необходима поддержка сеансов. Таким образом, в этом примере при настройке для получения сообщений из очереди и подписки конечных точек responseQueueServiceEndpoint и responseSubscriptionServiceEndpoint нужно заменить контракт службы ICalculatorResponse на интерфейс контракта на основе сеанса ICalculatorResponseSessionful. Дополнительные сведения см. далее в разделе Контракты службы в этой статье.

noteПримечание
Шина обслуживания поддерживает три разных типа схем учетных данных – SAML, Shared Secret, и Simple Web Token, однако данная версия Service Bus Explorer поддерживает только учетные данные Shared Secret. Тем не менее предлагаемый код можно легко расширить, включив в него поддержку других схем учетных данных. Чтобы получить секретный ключ издателя на портале управления Azure, нажмите кнопку Просмотр, предварительно выбрав определенное пространство имен в разделе Service Bus.

В результате откроется модальное диалоговое окно, в котором можно получить ключ, щелкнув выделенную красным цветом команду Копировать в буфер обмена.

noteПримечание
По умолчанию имя Издатель по умолчанию всегда имеет значение owner.

noteПримечание
При определении конечной точки службы WCF, которая использует привязку NetMessagingBinding для получения сообщений из подписки, часто допускают ошибку и присваивают URL-адрес подписки атрибуту адреса конечной точки службы (как показано в конфигурации ниже). В результате этого в процессе выполнения возникает исключение FaultException.

The message with To 'sb://NAMESPACE.servicebus.windows.net/responsetopic' cannot be processed at 
the receiver, due to an AddressFilter mismatch at the EndpointDispatcher. 
Check that the sender and receiver's EndpointAddresses agree."}

Неверная конфигурация


<?xml version="1.0"?>
<configuration>
  ...
  <system.serviceModel>
    ...
    <services>
      <service name="Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Service.ResponseHandlerService">
        <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
                  behaviorConfiguration="securityBehavior"
                  binding="netMessagingBinding"
                  bindingConfiguration="netMessagingBinding"
                  name="responseSubscriptionServiceEndpoint"
                  contract="ICalculatorResponse" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

Ошибка возникает потому, что заголовок адреса To для сообщения содержит адрес раздела, а не адрес подписки:


<:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://www.w3.org/2005/08/addressing">
  <s:Header>
    <a:Action s:mustUnderstand="1">ReceiveResponse</a:Action>
    <a:MessageID>urn:uuid:64cb0b06-1622-4920-a035-c27b610cfcaf</a:MessageID>
    <a:To s:mustUnderstand="1">sb://NAMESPACE.servicebus.windows.net/responsetopic</a:To>
  </s:Header>
  <s:Body>... stream ...</s:Body>
</s:Envelope>

Чтобы правильно настроить конечную точку службы на получение сообщения из подписки, необходимо выполнить следующие действия.

  • В качестве значения для атрибута address следует указать URL-адрес раздела, к которому относится подписка. URL-адрес раздела объединяет URL-адрес пространства имен службы и имя раздела.

  • В качестве значения атрибута listenUri следует указать URL-адрес подписки. URL-адрес подписки объединяет URL-адрес раздела, строку /Subscriptions/ и имя подписки.

  • Присвойте значение Explicit атрибуту listenUriMode. Значение по умолчанию для свойства listenUriModeExplicit, поэтому это действие необязательно.

Описание атрибутов address, listenUri и listenUriMode см. на следующей странице в библиотеке MSDN.

Правильная конфигурация


<?xml version="1.0"?>
<configuration>
  ...
  <system.serviceModel>
    ...
    <services>
      <service name="Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Service.ResponseHandlerService">
        <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic"
                  listenUri="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
                  behaviorConfiguration="securityBehavior"
                  binding="netMessagingBinding"
                  bindingConfiguration="netMessagingBinding"
                  name="subscriptionEndpoint"
                  contract="ICalculatorResponse" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

Чтобы выполнить ту же задачу с помощью API, необходимо правильно задать значения для свойств Address, ListenUri и ListenUriMode в экземпляре ServiceEndpoint в соответствии с указаниями.

В следующей таблице приведен код, используемый клиентским приложением для запуска службы ResponseHandlerService, которая позволяет считывать сообщения-ответы из очереди responsequeue и подписки ItalyMilan раздела responsetopic. Код службы будет рассмотрен в следующем разделе.


private void StartServiceHost()
{
    try
    {
        // Creating the service host object as defined in config
        var serviceHost = new ServiceHost(typeof(ResponseHandlerService));
                
        // Add ErrorServiceBehavior for handling errors encounter by servicehost during execution.
        serviceHost.Description.Behaviors.Add(new ErrorServiceBehavior());


        foreach (var serviceEndpoint in serviceHost.Description.Endpoints)
        {
            if (serviceEndpoint.Name == "responseQueueServiceEndpoint")
            {
                responseQueueUri = serviceEndpoint.Address.Uri.AbsoluteUri;
                WriteToLog(string.Format(ServiceHostListeningOnQueue,
                                        serviceEndpoint.Address.Uri.AbsoluteUri));
            }
            if (serviceEndpoint.Name == "responseSubscriptionServiceEndpoint")
            {
                responseTopicUri = serviceEndpoint.Address.Uri.AbsoluteUri;
                WriteToLog(string.Format(ServiceHostListeningOnSubscription,
                                            serviceEndpoint.ListenUri.AbsoluteUri));
            }
        }

        // Start the service host
        serviceHost.Open();
        WriteToLog(ServiceHostSuccessfullyOpened);
    }
    catch (Exception ex)
    {
        mainForm.HandleException(ex);
    }
}

На следующем рисунке показан пользовательский интерфейс клиентского приложения.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба11

Переключатели в группе Метод запроса позволяют отправить сообщение-запрос в очередь requestqueue или раздел requesttopic, а переключатели в группе Метод ответа — получить ответ из очереди responsequeue или подписки ItalyMilan раздела responsetopic. Чтобы связать выбранные элементы с базовым приложением BizTalk, приложение использует объект BrokeredMessageProperty для присваивания значения закрытых полей responseQueueUri и responseTopicUri свойству ReplyTo. Следующая таблица содержит код метода, используемого клиентским приложением для отправки сообщения шине обслуживания. Для вашего удобства в код были добавлены комментарии, упрощающие его понимание.


private void SendRequestMessageUsingWCF(string endpointConfigurationName)
{
    try
    {
        if (string.IsNullOrEmpty(endpointConfigurationName))
        {
            WriteToLog(EndpointConfigurationNameCannotBeNull);
            return;
        }

        // Set the wait cursor
        Cursor = Cursors.WaitCursor;

        // Make sure that the request message contains at least an operation
        if (operationList == null ||
            operationList.Count == 0)
        {
            WriteToLog(OperationListCannotBeNull);
            return;
        }

        // Create warning collection
        var warningCollection = new List<string>();

        // Create request message
        var calculatorRequest = new CalculatorRequest(operationList);
        var calculatorRequestMessage = new CalculatorRequestMessage(calculatorRequest);

        // Create the channel factory for the currennt client endpoint
        // and cache it in the channelFactoryDictionary
        if (!channelFactoryDictionary.ContainsKey(endpointConfigurationName))
        {
            channelFactoryDictionary[endpointConfigurationName] = 
                new ChannelFactory<ICalculatorRequest>(endpointConfigurationName);
        }

        // Create the channel for the currennt client endpoint
        // and cache it in the channelDictionary
        if (!channelDictionary.ContainsKey(endpointConfigurationName))
        {
            channelDictionary[endpointConfigurationName] = 
                channelFactoryDictionary[endpointConfigurationName].CreateChannel();
        }

        // Use the OperationContextScope to create a block within which to access the current OperationScope
        using (new OperationContextScope((IContextChannel)channelDictionary[endpointConfigurationName]))
        {
            // Create a new BrokeredMessageProperty object
            var brokeredMessageProperty = new BrokeredMessageProperty();

            // Read the user defined properties and add them to the  
            // Properties collection of the BrokeredMessageProperty object
            foreach (var e in propertiesBindingSource.Cast<PropertyInfo>())
            {
                try
                {
                    e.Key = e.Key.Trim();
                    if (e.Type != StringType && e.Value == null)
                    {
                        warningCollection.Add(string.Format(CultureInfo.CurrentUICulture, 
                                                            PropertyValueCannotBeNull, e.Key));
                    }
                    else
                    {
                        if (brokeredMessageProperty.Properties.ContainsKey(e.Key))
                        {
                            brokeredMessageProperty.Properties[e.Key] = 
                                ConversionHelper.MapStringTypeToCLRType(e.Type, e.Value);
                        }
                        else
                        {
                            brokeredMessageProperty.Properties.Add(e.Key, 
                                ConversionHelper.MapStringTypeToCLRType(e.Type, e.Value));
                        }
                    }
                }
                catch (Exception ex)
                {
                    warningCollection.Add(string.Format(CultureInfo.CurrentUICulture, 
                        PropertyConversionError, e.Key, ex.Message));
                }
            }

            // if the warning collection contains at least one or more items,
            // write them to the log and return immediately
            StringBuilder builder;
            if (warningCollection.Count > 0)
            {
                builder = new StringBuilder(WarningHeader);
                var warnings = warningCollection.ToArray<string>();
                for (var i = 0; i < warningCollection.Count; i++)
                {
                    builder.AppendFormat(WarningFormat, warnings[i]);
                }
                mainForm.WriteToLog(builder.ToString());
                return;
            }

            // Set the BrokeredMessageProperty properties
            brokeredMessageProperty.Label = txtLabel.Text;
            brokeredMessageProperty.MessageId = Guid.NewGuid().ToString();
            brokeredMessageProperty.SessionId = sessionId;
            brokeredMessageProperty.ReplyToSessionId = sessionId;
            brokeredMessageProperty.ReplyTo = responseQueueRadioButton.Checked
                                                ? responseQueueUri
                                                : responseTopicUri;
            OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, 
                                                                    brokeredMessageProperty);
                    
            // Send the request message to the requestqueue or requesttopic
            var stopwatch = new Stopwatch();
            try
            {
                stopwatch.Start();
                channelDictionary[endpointConfigurationName].SendRequest(calculatorRequestMessage);
            }
            catch (CommunicationException ex)
            {
                if (channelFactoryDictionary[endpointConfigurationName] != null)
                {
                    channelFactoryDictionary[endpointConfigurationName].Abort();
                    channelFactoryDictionary.Remove(endpointConfigurationName);
                    channelDictionary.Remove(endpointConfigurationName);
                }
                HandleException(ex);
            }
            catch (Exception ex)
            {
                if (channelFactoryDictionary[endpointConfigurationName] != null)
                {
                    channelFactoryDictionary[endpointConfigurationName].Abort();
                    channelFactoryDictionary.Remove(endpointConfigurationName);
                    channelDictionary.Remove(endpointConfigurationName);
                }
                HandleException(ex);
            }
            finally
            {
                stopwatch.Stop();
            }
            // Log the request message and its properties
            builder = new StringBuilder();
            builder.AppendLine(string.Format(CultureInfo.CurrentCulture,
                    MessageSuccessfullySent,
                    channelFactoryDictionary[endpointConfigurationName].Endpoint.Address.Uri.AbsoluteUri,
                    brokeredMessageProperty.MessageId,
                    brokeredMessageProperty.SessionId,
                    brokeredMessageProperty.Label,
                    stopwatch.ElapsedMilliseconds));
            builder.AppendLine(PayloadFormat);
            for (var i = 0; i < calculatorRequest.Operations.Count; i++)
            {
                builder.AppendLine(string.Format(RequestFormat,
                                                    i + 1,
                                                    calculatorRequest.Operations[i].Operand1,
                                                    calculatorRequest.Operations[i].Operator,
                                                    calculatorRequest.Operations[i].Operand2));
            }
            builder.AppendLine(SentMessagePropertiesHeader);
            foreach (var p in brokeredMessageProperty.Properties)
            {
                builder.AppendLine(string.Format(MessagePropertyFormat,
                                                    p.Key,
                                                    p.Value));
            }
            var traceMessage = builder.ToString();
            WriteToLog(traceMessage.Substring(0, traceMessage.Length - 1));
        }
    }
    catch (Exception ex)
    {
        // Handle the exception
        HandleException(ex);
    }
    finally
    {
        // Restoire the defaulf cursor
        Cursor = Cursors.Default;
    }
}

Следующая таблица содержит код службы WCF, которая используется клиентским приложением для извлечения и записи в журнал сообщений-ответов из очереди responsequeue и подписки ItalyMilan раздела responsetopic. Для этого служба обеспечивает две разные конечные точки, каждая из которых использует привязку NetMessagingBinding и получает сообщения из одной из двух очередей. На самом деле каждую подписку можно рассматривать как виртуальную очередь, получающую копии сообщений, опубликованных в разделе, к которому относится подписка. В следующей таблице показан код класса ResponseHandlerService. Как видите, служба получает свойство BrokeredMessageProperty из коллекции Properties входящего сообщения WCF и производит через этот объект доступ к свойствам сообщения-ответа. Поскольку в контракте службы ICalculatorResponse метод ReceiveResponse имеет заданный параметр [ReceiveContextEnabled(ManualControl = true)], метод службы должен явным образом сообщить о получении. Для этого служба должна явным образом вызывать метод ReceiveContext.Complete и зафиксировать операцию получения. На самом деле, как уже говорилось в начале статьи, если свойство ManualControl имеет значение true, то сообщение, полученное из канала, доставляется на операцию службы с блокировкой. Приложение службы должно вызвать либо метод Complete(TimeSpan), либо Abandon(TimeSpan), чтобы просигнализировать о завершении получения сообщения. Если ни один из этих методов не вызван, то блокировка сообщения будет сохраняться до тех пор, пока не истечет время ожидания. После снятия блокировки — вызовом метода Abandon(TimeSpan) или по истечении времени ожидания — сообщение будет отправлено повторно из канала в службу. После вызова метода Complete(TimeSpan) сообщение будет отмечено как успешно полученное.

Класс ResponseHandlerService


      [ServiceBehavir(Namespace = "http://windwsazure.cat.micrsft.cm/samples/servicebus", 
                 CnfiguratinName = "RespnseHandlerService")]
 public class RespnseHandlerService : ICalculatrRespnseSessinful
 {
     #regin Private Cnstants
     //***************************
     // Frmats
     //***************************
     private cnst string MessageSuccessfullyReceived = "Respnse Message Received:\n - EndpintUrl:[{0}]\n - CrrelatinId=[{1}]\n - SessinId=[{2}]\n - Label=[{3}]";
     private cnst string ReceivedMessagePrpertiesHeader = "Prperties:";
     private cnst string PayladFrmat = "Paylad:";
     private cnst string StatusFrmat = " - Status=[{0}]";
     private cnst string ResultFrmat = " - Result[{0}]: Value=[{1}] Errr=[{2}]";
     private cnst string MessagePrpertyFrmat = " - Key=[{0}] Value=[{1}]";
 
     //***************************
     // Cnstants
     //***************************
     private cnst string Empty = "EMPTY";
     #endregin
 
     #regin Public peratins
     [peratinBehavir]
     public vid ReceiveRespnse(CalculatrRespnse calculatrRespnse)
     {
         try
         {
             // Get the message prperties
             var incmingPrperties = peratinCntext.Current.IncmingMessagePrperties;
             if (calculatrRespnse != null)
             {
                 var brkeredMessagePrperty = incmingPrperties[BrkeredMessagePrperty.Name] as BrkeredMessagePrperty;
 
                 // Trace the respnse message
                 var builder = new StringBuilder();
                 if (brkeredMessagePrperty != null)
                     builder.AppendLine(string.Frmat(MessageSuccessfullyReceived, 
                                                         peratinCntext.Current.Channel.LcalAddress.Uri.AbsluteUri,
                                                         brkeredMessagePrperty.CrrelatinId ?? Empty,
                                                         brkeredMessagePrperty.SessinId ?? Empty,
                                                         brkeredMessagePrperty.Label ?? Empty));
                 builder.AppendLine(PayladFrmat);
                 builder.AppendLine(string.Frmat(StatusFrmat,
                                                     calculatrRespnse.Status));
                 if (calculatrRespnse.Results != null && 
                     calculatrRespnse.Results.Cunt > 0)
                 {
                     fr (int i = 0; i < calculatrRespnse.Results.Cunt; i++)
                     {
                         builder.AppendLine(string.Frmat(ResultFrmat, 
                                                             i + 1, 
                                                             calculatrRespnse.Results[i].Value,
                                                             calculatrRespnse.Results[i].Errr));
                     }
                 }
                 builder.AppendLine(ReceivedMessagePrpertiesHeader);
                 if (brkeredMessagePrperty != null)
                 {
                     freach (var prperty in brkeredMessagePrperty.Prperties)
                     {
                         builder.AppendLine(string.Frmat(MessagePrpertyFrmat,
                                                             prperty.Key,
                                                             prperty.Value));
                     }
                 }
                 var traceMessage = builder.TString();
                 Trace.WriteLine(traceMessage.Substring(0, traceMessage.Length - 1));
             }
             //Cmplete the Message
             ReceiveCntext receiveCntext;
             if (ReceiveCntext.TryGet(incmingPrperties, ut receiveCntext))
             {
                 receiveCntext.Cmplete(TimeSpan.FrmSecnds(10.0d));
             }
             else
             {
                 thrw new InvalidperatinExceptin("Receiver is in peek lck mde but receive cntext is nt available!");
             }
         }
         catch (Exceptin ex)
         {
             Trace.WriteLine(ex.Message);
         }
     } 
     #endregin
 }

Служба WCF Workflow Service предоставляет рабочую среду для разработки надежных операций или служб. Служба Workflow Service реализуется с помощью WF-действий, которые могут использовать WCF для отправки и получения данных. Более подробное рассмотрение построения службы WCF Workflow Service выходит за рамки этой статьи. Дополнительные сведения о службах WCF Workflow Services см. в следующих статьях:

В версии WF 4.0 были введены действия обмена сообщениями, которые позволяют разработчикам обеспечивать или использовать службы WCF простым и гибким образом. В частности, действия обмена сообщениями позволяют рабочим процессам отправлять данные в другие системы (Send, SendReply) и получать данные из других систем (Receive, ReceiveReply) с помощью WCF. Однако эти действия скрывают множество процессов, выполняемых в WCF. В частности, действия обмена сообщениями не обеспечивают доступ к текущему контексту OperationContext, который может быть использован для выполнения следующей операции.

  • На стороне отправителя контекст OperationContext позволяет включить дополнительные заголовки сообщения в конверт SOAP или добавить свойства сообщения в исходящее сообщение.

  • На стороне получателя контекст OperationContext позволяет извлекать свойства сообщения и информацию безопасности из входящего сообщения.

Как было показано в первой части статьи, когда приложение использует WCF и привязку NetMessagingBinding для отправки сообщения в очередь или раздел, сообщение упаковывается в конверт SOAP и шифруется. Чтобы задать определенные свойства BrokeredMessage, создайте объект BrokeredMessageProperty, задайте свойства и добавьте этот объект в коллекцию Propertiesсообщения WCF. Таким образом, чтобы извлечь BrokeredMessageProperty из входящего сообщения или добавить BrokeredMessageProperty в коллекцию Properties исходящего WCF-сообщения, нужно расширить функциональность, обеспечиваемую действиями обмена сообщениями по умолчанию. К счастью, WF 4.0 позволяет расширить возможности времени выполнения обмена сообщениями с помощью методов IReceiveMessageCallback и ISendMessageCallback. В частности:

  • Интерфейс IReceiveMessageCallback реализует метод обратного вызова, который выполняется при получении сообщения службы действием Receive.

  • Интерфейс ISendMessageCallback.interface реализует метод обратного вызова, который выполняется непосредственно перед отправкой сообщения действием Send.

В демонстрационной версии дополнительные возможности были использованы для создания пользовательского действия NativeActivity с именем BrokeredMessagePropertyActivity, которое позволяет:

  • получить BrokeredMessageProperty из свойств входящего WCF-сообщения;

  • задать BrokeredMessageProperty для исходящего WCF-сообщения.

Роман Кисс в своей статье придерживается того же подхода и создает более сложное действие, которое обрабатывает свойство для каждого свойства класса BrokeredMessageProperty. Настоятельно рекомендуем ознакомиться с его статьей, поскольку в ней рассматривается альтернативный способ реализации решения, которое предлагается здесь.

Код действия BrokeredMessagePropertyActivity приводится в следующей таблице.

Класс BrokeredMessagePropertyActivity


[Designer(typeof(BrokeredMessagePropertyActivityDesigner))]
public class BrokeredMessagePropertyActivity : NativeActivity
{
    #region Public Properties
    [Browsable(false)]
    public Activity Body { get; set; }
    public InOutArgument<BrokeredMessageProperty> BrokeredMessageProperty { get; set; }
    #endregion

    #region NativeActivity Overriden Methods
    protected override void CacheMetadata(NativeActivityMetadata metadata)
    {
        metadata.AddChild(Body);
        base.CacheMetadata(metadata);
    }

    protected override void Execute(NativeActivityContext context)
    {
        // Add the BrokeredMessagePropertyMessageCallback implementation as an Execution property 
        var value = context.GetValue(BrokeredMessageProperty);
        context.Properties.Add(typeof(BrokeredMessagePropertyCallback).Name,
                                new BrokeredMessagePropertyCallback(value));
        context.ScheduleActivity(Body, OnBodyCompleted);
    }

    private void OnBodyCompleted(NativeActivityContext context, ActivityInstance instance)
    {
        // Sets the value of the BrokeredMessageProperty argument
        var callback = context.Properties.Find(typeof(BrokeredMessagePropertyCallback).Name) as BrokeredMessagePropertyCallback;
        if (callback != null)
        {
            context.SetValue(BrokeredMessageProperty, callback.BrokeredMessageProperty);
        }
    } 
    #endregion  
}

Класс BrokeredMessagePropertyCallback


[DataContract]
public class BrokeredMessagePropertyCallback : IReceiveMessageCallback, ISendMessageCallback
{
    #region Public Properties
    [DataMember]
    public BrokeredMessageProperty BrokeredMessageProperty { get; set; } 
    #endregion

    #region Public Constructors
    public BrokeredMessagePropertyCallback(BrokeredMessageProperty property)
    {
        BrokeredMessageProperty = property;
    }
    #endregion

    #region IReceiveMessageCallback Methods
    public void OnReceiveMessage(OperationContext operationContext, ExecutionProperties activityExecutionProperties)
    {
        try
        {
            // Get the BrokeredMessageProperty from an inbound message
            var incomingMessageProperties = operationContext.IncomingMessageProperties;
            BrokeredMessageProperty = incomingMessageProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    } 
    #endregion

    #region ISendMessageCallback Methods
    public void OnSendMessage(OperationContext operationContext)
    {
        // Set the BrokeredMessageProperty for an outbound message 
        if (BrokeredMessageProperty != null)
        {
            operationContext.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, BrokeredMessageProperty);
        }
    } 
    #endregion
}

Действие BrokeredMessagePropertyActivity обертывает действие Receive или Send. Автоматически обеспечивается доступ к классу BrokeredMessageProperty, который позволяет считывать и записывать явные и определяемые пользователем свойства BrokeredMessage.

Чтобы разработать входящий запрос и сформировать ответ, я создал пользовательское действие CalculatorActivity, код которого приводится в следующей таблице.

Класс CalculatorActivity


#region Using Directives
using System;
using System.Activities;
using System.ComponentModel;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.DataContracts;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.WorkflowActivities
{
    /// <summary>
    /// This class can be used to process a calculator request.
    /// </summary>
    [Designer(typeof(CalculatorActivityDesigner))]
    public sealed class CalculatorActivity : CodeActivity
    {
        #region Private Constants
        private const string Empty = "Empty";
        private const string MessageId = "MessageId";
        private const string SessionId = "SessionId";
        private const string CorrelationId = "CorrelationId";
        private const string Label = "Label";
        private const string ReplyTo = "ReplyTo";
        private const string Source = "Source";
        private const string CalculatoService = "CalculatoService";
        private const string Ok = "ok";
        private const string Failed = "Failed";
        private const string OperationsHeader = "Operations:";
        private const string OperationFormat = "{0} {1} {2} = {3}";
        private const string RequestMessagePropertiesHeader = "Request Message Properties:";
        private const string ResponseMessagePropertiesHeader = "Response Message Properties:";
        private const string RequestMessageUserDefinedPropertiesHeader = "Request Message User-Defined Properties:";
        private const string ResponseMessageUserDefinedPropertiesHeader = "Response Message User-Defined Properties:";
        private const string PropertyFormat = "Key=[{0}] Value=[{1}]";
        private const string OperationUnknownErrorMessageFormat = "The operation failed because the operator {0} is unknown.";

        #endregion

        #region Activity Arguments
        [DefaultValue(null)]
        public InArgument<CalculatorRequest> CalculatorRequest { get; set; }
        [DefaultValue(null)]
        public InArgument<BrokeredMessageProperty> InboundBrokeredMessageProperty 
                                                   { get; set; }
        public OutArgument<BrokeredMessageProperty> OutboundBrokeredMessageProperty 
                                                   { get; set; }
        public OutArgument<CalculatorResponse> CalculatorResponse { get; set; }
        #endregion

        #region Private Fields
        private readonly string line = new string('-', 79);
        #endregion

        #region Protected Methods
        /// <summary>
        /// Processes a calculator calculatorRequest.
        /// </summary>
        /// <param name="context">The execution context under which the activity executes.</param>
        protected override void Execute(CodeActivityContext context)
        {
            // Obtain the runtime value of the CalculatorRequest input arguments
            var calculatorRequest = context.GetValue(CalculatorRequest);
            var calculatorResponse = new CalculatorResponse();
            if (calculatorRequest == null)
            {
                context.SetValue(CalculatorResponse, calculatorResponse);
                return;
            }
            // Print the properties of the inbound BrokeredMessageProperty
            var brokeredMessageProperty = context.GetValue(InboundBrokeredMessageProperty);
            if (brokeredMessageProperty != null)
            {
                Console.WriteLine(RequestMessagePropertiesHeader);
                Console.WriteLine(line);
                Console.WriteLine(string.Format(PropertyFormat, 
                                                MessageId, 
                                      brokeredMessageProperty.MessageId ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                SessionId, 
                                      brokeredMessageProperty.SessionId ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                ReplyTo, 
                                      brokeredMessageProperty.ReplyTo ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                Label, 
                                      brokeredMessageProperty.Label ?? Empty));
                Console.WriteLine(line);
                if (brokeredMessageProperty.Properties.Count > 0)
                {
                    Console.WriteLine(RequestMessageUserDefinedPropertiesHeader);
                    Console.WriteLine(line);
                    foreach (var property in brokeredMessageProperty.Properties)
                    {
                        Console.WriteLine(string.Format(PropertyFormat, 
                                                        property.Key, 
                                                        property.Value));
                    }
                    Console.WriteLine(line);
                }
            }

            // Process the request message and create a response message
            string error = null;
            calculatorResponse.Status = Ok;
            if (calculatorRequest.Operations.Count > 0)
            {
                Console.WriteLine(OperationsHeader);
                Console.WriteLine(line);
                foreach (var operation in calculatorRequest.Operations)
                {
                    double value = 0;
                    var succeeded = true;
                    switch (operation.Operator)
                    {
                        case "+":
                            value = operation.Operand1 + operation.Operand2;
                            break;
                        case "-":
                            value = operation.Operand1 - operation.Operand2;
                            break;
                        case "*":
                        case "x":
                            value = operation.Operand1 * operation.Operand2;
                            break;
                        case "/":
                        case "\\":
                        case ":":
                            value = operation.Operand1 / operation.Operand2;
                            break;
                        default:
                            error = string.Format(OperationUnknownErrorMessageFormat,
                                                  operation.Operator);
                            succeeded = false;
                            calculatorResponse.Status = Failed;
                            break;
                    }
                    Console.WriteLine(succeeded
                                          ? string.Format(OperationFormat, operation.Operand1, operation.Operator,
                                                          operation.Operand2, value)
                                          : error);
                    calculatorResponse.Results.Add(new Result(value, error));
                }
                Console.WriteLine(line);
            }
            context.SetValue(CalculatorResponse, calculatorResponse);

            // Create a new BrokeredMessageProperty for the reply message
            var replyBrokeredMessageProperty = new BrokeredMessageProperty
            {
               MessageId = Guid.NewGuid().ToString(),
               CorrelationId = brokeredMessageProperty != null ? 
                               brokeredMessageProperty.MessageId :
                               null,
               SessionId = brokeredMessageProperty != null ?
                           brokeredMessageProperty.ReplyToSessionId :
                           null,
               Label = brokeredMessageProperty != null ?
                       brokeredMessageProperty.Label :
                       null
            };
            if (brokeredMessageProperty != null)
            {
                foreach (var property in brokeredMessageProperty.Properties)
                {
                    replyBrokeredMessageProperty.Properties.Add(property);
                }
            }

            // Print the properties of the outbound BrokeredMessageProperty
            replyBrokeredMessageProperty.Properties.Add(Source, CalculatoService);
            Console.WriteLine(ResponseMessagePropertiesHeader);
            Console.WriteLine(line);
            Console.WriteLine(string.Format(PropertyFormat, 
                                            MessageId, 
                               replyBrokeredMessageProperty.MessageId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            CorrelationId, 
                               replyBrokeredMessageProperty.CorrelationId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            SessionId, 
                               replyBrokeredMessageProperty.SessionId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            ReplyTo, 
                               replyBrokeredMessageProperty.ReplyTo ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            Label, 
                               replyBrokeredMessageProperty.Label ?? Empty));
            Console.WriteLine(line);
            if (replyBrokeredMessageProperty.Properties.Count > 0)
            {
                Console.WriteLine(ResponseMessageUserDefinedPropertiesHeader);
                Console.WriteLine(line);
                foreach (var property in replyBrokeredMessageProperty.Properties)
                {
                    Console.WriteLine(string.Format(PropertyFormat, 
                                                    property.Key, 
                                                    property.Value));
                }
                Console.WriteLine(line);
            }

            // Set the outbound context property
            context.SetValue(OutboundBrokeredMessageProperty, 
                             replyBrokeredMessageProperty);
        }
        #endregion
    }
}

Это действие предоставляет два входных и два выходных аргумента.

  • CalculatorRequest — аргумент InArgument типа CalculatorRequest, который позволяет рабочему процессу передать запрос в действие.

  • InboundBrokeredMessageProperty — аргумент InArgument типа BrokeredMessageProperty, который позволяет передать в качестве входного параметра свойство BrokeredMessageProperty, извлеченное из сообщения-запроса WCF.

  • CalculatorResponse — аргумент OutArgument типа CalculatorResponse используется действием для возврата рабочему процессу ответа в качестве выходного параметра.

  • OutboundBrokeredMessageProperty — аргумент OutArgument используется действием для возврата исходящего свойства BrokeredMessageProperty рабочему процессу, который с помощью экземпляра BrokeredMessagePropertyActivity присвоит значение этого выходного параметра свойству BrokeredMessageProperty в сообщении-ответе WCF.

Если вкратце, то CalculatorActivity получает входящее сообщение и BrokeredMessageProperty в качестве входных аргументов, обрабатывает сообщение-запрос и формирует сообщение-ответ и исходящее свойство BrokeredMessageProperty. При запуске в консольном приложении действие отслеживает свойства входящего и исходящего свойств BrokeredMessageProperty в стандартном устройстве вывода.

В этом разделе будет рассмотрено, каким образом служба WCF Workflow Service связывается с клиентским приложением с помощью очередей и разделов на основе Service Bus. В демонстрационной версии служба WCF Workflow Service размещается в консольном приложении, однако решение можно изменить и запустить службу в локальном приложении на базе служб IIS или в роли Azure в облаке. В следующей таблице содержится код, используемый консольным приложением для инициализации и запуска объекта WorkflowServiceHost. В частности, локальная конечная точка HTTP, указанная в конструкторе объекта, позволяет извлечь код WSDL, формируемый службой WCF Workflow Service.

Класс Program


using System;
using System.Xaml;
using System.ServiceModel.Activities;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.Service;

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.WorkflowConsoleApplication
{

    class Program
    {
        static void Main(string[] args)
        {
            var settings = new XamlXmlReaderSettings()
                               {
                                   LocalAssembly = typeof(Program).Assembly
                               };
            var reader = new XamlXmlReader(@"..\..\CalculatorService.xamlx", settings); 
            var service = (WorkflowService) XamlServices.Load(reader);
            using (var host = new WorkflowServiceHost(service, new Uri("http://localhost:7571")))
            {
                host.Description.Behaviors.Add(new ErrorServiceBehavior());
                host.Open();
                Console.WriteLine("Press [ENTER] to exit");
                Console.ReadLine();
                host.Close();
            }
        }
    }
}

В следующей таблице содержится файл конфигурации консольного приложения, который играет ключевую роль в определении конечных точек клиента и службы WCF, используемых службой WCF Workflow Service для обмена сообщениями запроса и ответа с клиентским приложением через очереди и разделы Service Bus.

Файл конфигурации консольного приложения Application App.Config


   1:  <?xml version="1.0"?>
   2:  <configuration>
   3:    <system.diagnostics>
   4:      <sources>
   5:        <source name="System.ServiceModel.MessageLogging" switchValue="Warning, ActivityTracing">
   6:          <listeners>
   7:            <add type="System.Diagnostics.DefaultTraceListener" name="Default">
   8:              <filter type="" />
   9:            </add>
  10:            <add name="ServiceModelMessageLoggingListener">
  11:              <filter type="" />
  12:            </add>
  13:          </listeners>
  14:        </source>
  15:      </sources>
  16:      <sharedListeners>
  17:        <add initializeData="C:\WorkflowConsoleApplication.svclog"
  18:             type="System.Diagnostics.XmlWriterTraceListener, System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
  19:             name="ServiceModelMessageLoggingListener"
  20:             traceOutputOptions="Timestamp">
  21:          <filter type="" />
  22:        </add>
  23:      </sharedListeners>
  24:    </system.diagnostics>
  25:    <startup>
  26:      <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
  27:    </startup>
  28:    <system.serviceModel>
  29:      <diagnostics>
  30:        <messageLogging logEntireMessage="true"
  31:                        logMalformedMessages="true"
  32:                        logMessagesAtServiceLevel="true"
  33:                        logMessagesAtTransportLevel="true" />
  34:        </diagnostics>
  35:      <behaviors>
  36:        <serviceBehaviors>
  37:          <behavior>
  38:            <serviceMetadata httpGetEnabled="true" />
  39:            <serviceDebug includeExceptionDetailInFaults="true" />
  40:            <useRequestHeadersForMetadataAddress />
  41:            <workflowUnhandledException action="AbandonAndSuspend" />
  42:          </behavior>
  43:        </serviceBehaviors>
  44:        <endpointBehaviors>
  45:          <behavior name="securityBehavior">
  46:            <transportClientEndpointBehavior>
  47:              <tokenProvider>
  48:                <sharedSecret issuerName="owner" 
  49:                              issuerSecret="ISSUER_SECRET"/>
  50:              </tokenProvider>
  51:            </transportClientEndpointBehavior>
  52:          </behavior>
  53:        </endpointBehaviors>
  54:      </behaviors>
  55:      <bindings>
  56:        <netMessagingBinding>
  57:          <binding name="netMessagingBinding" 
  58:                   sendTimeout="00:03:00" 
  59:                   receiveTimeout="00:03:00" 
  60:                   openTimeout="00:03:00" 
  61:                   closeTimeout="00:03:00" 
  62:                   sessionIdleTimeout="00:01:00" 
  63:                   prefetchCount="-1">
  64:            <transportSettings batchFlushInterval="00:00:01"/>
  65:          </binding>
  66:        </netMessagingBinding>
  67:      </bindings>
  68:      <client>
  69: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsequeue" 
  70:                  behaviorConfiguration="securityBehavior" 
  71:                  binding="netMessagingBinding" 
  72:                  bindingConfiguration="netMessagingBinding" 
  73:                  contract="ICalculatorResponse" 
  74:                  name="ResponseQueueClientEndpoint"/>
  75: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic" 
  76:                  behaviorConfiguration="securityBehavior" 
  77:                  binding="netMessagingBinding" 
  78:                  bindingConfiguration="netMessagingBinding" 
  79:                  contract="ICalculatorResponse" 
  80:                  name="ResponseTopicClientEndpoint"/>
  81:      </client>
  82:      <services>
  83:        <service name="CalculatorService">
  84: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requestqueue" 
  85:                    behaviorConfiguration="securityBehavior" 
  86:                    binding="netMessagingBinding" 
  87:                    bindingConfiguration="netMessagingBinding" 
  88:                    name="RequestQueueServiceEndpoint" 
  89:                    contract="ICalculatorRequest"/>
  90: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requesttopic"
  91:           listenUri=
"sb://NAMESPACE.servicebus.windows.net/requesttopic/Subscriptions/ItalyMilan"
  92:                    behaviorConfiguration="securityBehavior"
  93:                    binding="netMessagingBinding"
  94:                    bindingConfiguration="netMessagingBinding"
  95:                    name="RequestTopicServiceEndpoint"
  96:                    contract="ICalculatorRequest" />
  97:        </service>
  98:      </services>
  99:    </system.serviceModel>
 100:    <startup>
 101:     <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
 102:    </startup>
 103:  </configuration>

Ниже приведено краткое описание основных элементов и разделов файла конфигурации. Обратите внимание, что определение конечных точек клиента и службы является двусторонним относительно конфигурации, используемой клиентским приложением.

  • Строки [3–33] включают трассировку, настраивают источники трассировки и задают уровни трассировки. В частности, раздел диагностики настроен для трассировки сообщений на уровне службы и транспорта и записи в файл журнала, который можно просмотреть с помощью средства просмотра трассировки службы (SvcTraceViewer.exe).

  • Строки [36–43] указывают поведение службы по умолчанию, которое используется службой WCF Workflow Service.

  • Строки [44–53] содержат определение класса securityBehavior, который используется конечной точкой клиента и службы для проверки подлинности на стороне службы управления доступом. В частности, метод TransportClientEndpointBehavior используется для определения общих закрытых учетных данных.

  • Строки [56–66] относятся к конфигурации привязки NetMessagingBinding, которая используется конечными точками клиента и службы для обмена сообщениями с шиной Service Bus.

  • Строки [69–74] содержат определение конечной точки ResponseQueueClientEndpoint, используемой службой WCF Workflow Service для отправки сообщений-ответов в очередь responsequeue. Параметр address клиентской конечной точки объединяет URL-адрес пространства имен службы и имя очереди.

  • Строки [78–83] содержат определение конечной точки ResponseTopicClientEndpoint, используемой службой WCF Workflow Service для отправки сообщений-ответов в раздел responsetopic. Параметр address клиентской конечной точки объединяет URL-адрес пространства имен службы и имя раздела.

  • Строки [83–89] содержат определение конечной точки RequestQueueServiceEndpoint, используемой службой WCF Workflow Service для получения сообщений-запросов из очереди requestqueue. Параметр address конечной точки службы объединяет URL-адрес пространства имен службы и имя очереди.

  • Строки [90–96] содержат определение конечной точки RequestTopicServiceEndpoint, используемой приложением для получения сообщений-запросов из подписки ItalyMilan для раздела requesttopic. При определении конечной точки службы WCF, которая использует привязку NetMessagingBinding для получения сообщений из подписки, следует выполнить следующие шаги.

    • В качестве значения для атрибута address следует указать URL-адрес раздела, к которому относится подписка. URL-адрес раздела объединяет URL-адрес пространства имен службы и имя раздела.

    • В качестве значения атрибута listenUri следует указать URL-адрес подписки. URL-адрес подписки объединяет URL-адрес раздела, строку /Subscriptions/ и имя подписки.

    • Присвойте значение Explicit атрибуту listenUriMode. Значение по умолчанию для свойства listenUriModeExplicit, поэтому это действие необязательно.

  • Строки [100–102] указывают, какую версию среды CLR поддерживает данное приложение.

Теперь рассмотрим шаги, необходимые для создания службы WCF Workflow Service, функции которой — получение запросов и отправка ответов через очереди и разделы Service Bus. В момент создания службы WCF Workflow Service рабочий процесс содержал только действие Sequence с действием Receive, за которым следовало действие SendReply (см. рисунок ниже).

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба12

Сначала я щелкнул по поверхности рабочего процесса и присвоил строку CalculatorService в качестве значения свойству ConfigurationName и Name службы WorkflowService, как показано на рисунке ниже. В частности, свойство ConfigurationName указывает имя конфигурации службы Workflow Service, его значение должно быть равным значению атрибута name элемента service в файле конфигурации.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба13

Затем я выбрал действие Sequential, щелкнул кнопку Переменные, чтобы открыть соответствующий редактор, и создал следующие переменные.

  • calculatorRequest — переменная типа CalculatorRequest, которая, как следует из ее имени, содержит текст сообщения-запроса. Его значение задается действием Receive, которое позволяет получить сообщение-запрос из очереди requestqueue или подписки ItalyMilan раздела requesttopic.

  • calculatorResponse — переменная типа CalculatorResponse, которая содержит текст сообщения-ответа, возвращаемого клиентским приложением. Значение задается пользовательским действием, которое обрабатывает запрос и формирует ответ.

  • inboundBrokeredMessageProperty — переменная, которая содержит BrokeredMessageProperty сообщения-ответа. Значение задается экземпляром действия BrokeredMessagePropertyActivity, которое обертывает действие Receive и считывает свойство BrokeredMessageProperty из свойств сообщения-запроса WCF.

  • outboundBrokeredMessageProperty — переменная, которая содержит BrokeredMessageProperty сообщения-ответа. Значение задается пользовательским действием, которое обрабатывает запрос и формирует ответ. BrokeredMessagePropertyActivity обертывает действие Send и присваивает значение переменной свойству BrokeredMessageProperty сообщения-ответа WCF.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба14

После этого я добавил действие TryCatch в рабочий процесс и обернул действие Receive экземпляром BrokeredMessagePropertyActivity, как показано на следующем рисунке.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба15

Далее я щелкнул BrokeredMessagePropertyActivity и присвоил переменную inboundBrokeredMessageProperty свойству BrokeredMessageProperty, как показано на рисунке ниже.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба16

Затем я выбрал действие Receive и настроил его свойства на получение сообщений-запросов из очереди requestqueue и подписки ItalyMilan раздела requesttopic с помощью конечных точек RequestQueueServiceEndpoint и RequestTopicServiceEndpoint, определенных в файле конфигурации.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба17

В частности, я использовал свойство ServiceContractName действия Receive для указания целевого пространства имен и имени контракта для конечной точки службы и свойство Action для указания заголовка действия в сообщении-запросе в соответствии с контрактом службы ICalculatorRequest.

Затем я добавил экземпляр CalculatorActivity в службу WCF Workflow Service после BrokeredMessagePropertyActivity и настроил его свойства, как показано на рисунке ниже.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба18

После этого я добавил действие If в рабочий процесс после CalculatorActivity и настроил его свойство Condition следующим образом:

Not String.IsNullOrEmpty(inboundBrokeredMessageProperty.ReplyTo) And 
inboundBrokeredMessageProperty.ReplyTo.ToLower().Contains("topic")

Затем я создал экземпляр BrokeredMessagePropertyActivity в обеих ветвях действия If и добавил действие Send в каждую из них, как показано на следующем рисунке.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба19

Таким образом, если адрес ответа, указанный в свойстве ReplyTo входящего свойства BrokeredMessageProperty, содержит строку «topic», то ответ отправляется в раздел responsetopic, в противном случае — в очередь responsequeue.

В обоих случаях экземпляр BrokeredMessagePropertyActivity (с отображаемым именем Set BrokeredMessage) используется для обертывания действия Send и присвоения исходящего свойства BrokeredMessageProperty свойствам коллекции сообщения-ответа WCF. Чтобы выполнить эту операцию, следует присвоить значение переменной outboundBrokeredMessageProperty свойству BrokeredMessageProperty обоих экземпляров BrokeredMessagePropertyActivity, как показано на рисунке ниже.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба20

Далее я выбрал действие Send в ветви Then и настроил его свойство следующим образом на отправку сообщения в раздел responsetopic с помощью ResponseTopicClientEndpoint, определенной в файле конфигурации.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба21

В частности, я использовал свойство ServiceContractName действия Send для указания целевого пространства имен и имени контракта для клиентской конечной точки, свойство Action для указания заголовка действия сообщения-ответа в соответствии с контрактом службы ICalculatorResponse и свойство EndpointConfigurationName для указания имени клиентской конечной точки, заданной в файле конфигурации.

Подобным же образом я настроил действие Send в ветви Else, как показано на следующем рисунке.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба22

На следующем рисунке показан весь рабочий процесс.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба23

Если все сделано правильно, можно протестировать созданное решение следующим образом.

  • Чтобы отправить сообщение-запрос службе WCF Workflow Service через очередь requestqueue, щелкните переключатель Очередь в группе Методы запроса.

  • Чтобы отправить сообщение-запрос службе WCF Workflow Service через раздел requesttopic, щелкните переключатель Раздел в группе Методы запроса.

  • Чтобы указать службе WCF Workflow Service отправить сообщение-ответ в очередь responsequeue, щелкните переключатель Очередь в группе Методы ответа.

  • Чтобы указать службе WCF Workflow Service отправить сообщение-ответ в раздел responsetopic, щелкните переключатель Раздел в группе Методы ответа.

На следующем рисунке приведены наиболее интересные сочетания.

  • Клиент отправляет сообщение-запрос в раздел requesttopic.

  • Служба WCF Workflow Service считывает запросы из подписки ItalyMilan для раздела requesttopic и отправляет ответ в раздел responsetopic.

  • Клиентское приложение получает сообщение-ответ из подписки ItalyMilan, определенной для раздела responsetopic.

На следующем рисунке показана информация, зарегистрированная службой WCF Workflow Service в качестве стандартного устройства вывода консольного приложения.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба24

На следующем рисунке показана информация, зарегистрированная клиентским приложением во время вызова.

ШинаОбслуживания-Очереди-Разделы-WCF-РабочийПроцесс-Служба25

В частности, можно обратить внимание на следующие моменты.

  1. Сообщение-запрос было отправлено в раздел requesttopic.

  2. Сообщение-ответ было получено из раздела responsetopic.

  3. CorrelationId сообщения-ответа соответствует MessageId сообщения-запроса.

  4. Свойство Label сообщения-запроса было скопировано службой WCF Workflow Service в свойство Label сообщения-ответа.

  5. Все пользовательские свойства были скопированы службой WCF Workflow Service из сообщения-запроса в сообщение-ответ.

  6. Служба WCF Workflow Service добавила пользовательское свойство Source в ответ.

  7. Свойство Area было добавлено действием правила, заданным для подписки ItalyMilan.

В этой статье были рассмотрены вопросы интеграции службы WCF Workflow Service с обменом сообщениями на основе Service Bus, а также вопросы обеспечения полной взаимосвязи между этими двумя технологиями за счет их функций и одного пользовательского действия, необходимого для обработки BrokeredMessageProperty. Приветствуются любые замечания и комментарии. Сопроводительный код к статье можно скачать из коллекции исходных кодов MSDN.

Была ли вам полезна эта информация?
(1500 символов осталось)
Спасибо за ваш отзыв
Показ:
© 2014 Microsoft