Экспорт (0) Печать
Развернуть все
Развернуть Свернуть

Создание приложений, которые используют очереди Service Bus

Обновлено: Октябрь 2014 г.

В данном разделе описывается полезность очередей и иллюстрируются способы написания простого приложения на основе очереди, которое использует Шина службы Microsoft Azure.

Можно рассмотреть сценарий из мира розничной торговли, в котором данные о продажах из индивидуального терминала точки продажи (POS), должны быть направлены в систему управления запасами, использующую эти данные для определения времени пополнения запасов. Следующее решение использует Служебная шина отправку сообщений для связи между терминалами и системой управления запасами как показано ниже:

Очереди-шины-обслуживания-Img1

Каждый POS терминал предоставляет отчет о своих продажах путем отправки сообщений в DataCollectionQueue. Эти сообщения остаются в данной очереди пока они не будут получены системой управления запасами. Данный шаблон обычно называется асинхронный обмен сообщениями, так как POS терминал не должен ждать ответа от системы управления запасами для продолжения работы.

Перед тем как рассматривать код, который необходим для настройки данного приложения, рассмотрим преимущества использования очереди в сценарии вместо того, чтобы POS терминалы обменивались сообщениями напрямую (синхронно) с системой управления запасами.

С шаблоном асинхронного обмена сообщениями производители и потребители не должны быть подключены к сети одновременно. Инфраструктура обмена сообщениями надежно хранит сообщения до тех пор, пока сторона потребителя не будет готова к приему. Такой подход позволяет компонентам распределенного приложения находиться в отключенном состоянии, по выбору, например для технического обслуживания или из-за сбоя в работе компонента, не оказывая воздействия на всю систему. Более того, приложение потребителя может находится в сети только несколько раз в день. Например, в данном сценарии розничной торговли, система управления запасами может подключаться к сети только в конце рабочего дня.

Во многих приложениях системная нагрузка варьируется, в то время как время обработки требуемое для каждой единицы работы как правило постоянно. Посредничество между отправителями и получателями сообщений при помощи очереди означает, что клиентское приложение (рабочий) может наработать среднюю нагрузку вместо пиковой. Глубина очереди будет расти или сужаться, так как входящая нагрузка меняется. Это напрямую экономит деньги в отношении количества требуемой инфраструктуры на обслуживание нагрузки приложения.

Очереди-шины-обслуживания-Img2

Если нагрузка возрастает, то можно добавить больше рабочих процессов для чтения данных из рабочей очереди. Каждое сообщение обрабатывается только одним рабочим процессом. Более того данная балансировка нагрузки на основе опросов позволяет оптимально использовать рабочие компьютеры даже если рабочий компьютер отличается по мощности обработки данных, так как они будут опрашивать сообщения на максимальной скорости. данный шаблон часто называется конкурирующий потребитель

Очереди-шины-обслуживания-Img3

Использование очереди отправки сообщений для посредничества между отправителями сообщений и получателями предоставляет свойственную ей свободную связь между компонентами. Так как производители и потребители не знают друг друга, потребитель может обновить терминал без какого-либо влияния на производителя. Более того топология обмена сообщениями может включать в сея существующие конечные точки. Подробнее данная тема будет обсуждаться при рассмотрении публикации/подписки.

Следующий раздел показывает как использоватьСлужебная шинадля построения данного приложения.

Потребуется учетная запись на портале , чтобы начать работу с Служебная шина. Если у Вас еще нет учетной записи, можно зарегистрироваться на бесплатный пробный период здесь. Потребуется зайти при помощи Идентификатор Windows Live ID (учетная запись Майкрософт), который будет связан с вашей учетной записьюСлужебная шина . как только вы это сделали, можете создать новую подписку Служебная шина В будущем когда бы вы ни заходили через вашу Идентификатор Windows Live ID (учетная запись Майкрософт), у вас будет доступ ко всем подпискам Служебная шина , связанным с Вашей учетной записью.

После получения подписки, можете создать новуюпространство имен службы. Вам потребует предоставить ваш новый пространство имен службы уникальное имя для всех Служебная шина учетных записей. Каждый пространство имен службы выступает в роли контейнера для набора сущностей Служебная шина. Дополнительные сведения см. в разделе Практическое руководство. Создание или изменение пространства имен службы служебной шины.

Чтобы использовать Служебная шина пространство имен службы, приложение должно ссылаться на сборку Служебная шина, в частности на файл Microsoft.ServiceBus.dll. Эта сборка входит в состав пакета SDK Microsoft Azure, ее можно скачать на странице скачивания пакета SDK Azure. Хотя пакет Служебная шина NuGet- это наиболее легкий способ получить Служебная шина API и настроить приложение со всеми зависимостямиСлужебная шина. Для более подробной информации по использованию NuGet и пакета Служебная шина см. Использование Пакет Служебной шины NuGet.

Операции управления для сущностей сообщений Служебная шина (очередей и разделов публикации подписки) выполняются в классе NamespaceManager. Служебная шина использует модель безопасности на основе утверждений, реализованную с помощью Microsoft Azure Active Directory Access Control (также называется Access Control Service или ACS). Требуются соответствующие учетные данные для того, чтобы создать экземпляр NamespaceManager для конкретного пространство имен службы. Класс TokenProvider представляет собой поставщика токенов безопасности со встроенными методами-фабрики, которые возвращают некоторые хорошо известные поставщики токенов. Здесь будет использоваться класс SharedSecretTokenProvider для хранения секретных совместных учетных данных и управления получением соответствующих токенов из Служба управления доступом. Экземпляр NamespaceManager конструируется с базовым адресом Служебная шина пространство имен службы и поставщиком токенов.

Класс NamespaceManager предоставляет методы создания, перечисления и удаления сущностей обмена сообщениями. Отрезок, показанный здесь иллюстрирует, что экземпляр NamespaceManager создан и использован для создания очереди DataCollectionQueue.

Uri uri = ServiceBusEnvironment.CreateServiceUri("sb""ingham-blog"string.Empty);
string name = "owner";
string key = "abcdefghijklmopqrstuvwxyz";
 
TokenProvider tokenProvider = 
    TokenProvider.CreateSharedSecretTokenProvider(name, key);
NamespaceManager namespaceManager = 
    new NamespaceManager(uri, tokenProvider);
namespaceManager.CreateQueue("DataCollectionQueue");

Заметьте, что существуют перегрузки метода CreateQueue , которые включают свойства настройки очереди. Например, можно установить значение срока жизни по умолчанию (TTL) для сообщений отправленных в очередь.

Для операций времени выполнения в Служебная шина сущностях; например, для отправки и получения сообщений приложение во-первых должно создать объект MessagingFactory. Схожий с NamespaceManager класс, экземпляр MessagingFactory instance создан из базы адресов пространство имен службы и поставщика токенов.

 MessagingFactory factory = MessagingFactory.Create(uri, tokenProvider);

Сообщение отправлено в и получено из очередейСлужебная шина, которые являются экземплярами класса BrokeredMessage. Данный класс состоит из набора стандартных свойств (например Label иTimeToLive), словаря, который используется для хранения свойств приложения, и набора произвольных данных приложения. Приложение может настроить тело путем прохода в любой сериализуемый объект (следующий пример проходит в объект SalesData, который представляет данные о продажах из POS терминала), который будет использовать сериализуемый объект DataContractSerializer. Альтернативно может быть предоставлено Stream.

 BrokeredMessage bm = new BrokeredMessage(salesData);
 bm.Label = "SalesReport";
 bm.Properties["StoreName"] = "Redmond";
 bm.Properties["MachineID"] = "POS_1";

Наиболее легкий способ отправки сообщений в данную очередь, в нашем случае DataCollectionQueue, это использовать CreateMessageSender для создания объекта MessageSenderнапрямую из экземпляра MessagingFactory.

MessageSender sender = factory.CreateMessageSender("DataCollectionQueue");
sender.Send(bm);

Наиболее простой способ получения сообщений из очереди это использовать объект MessageReceiver, который можно создать напрямую из MessagingFactoryиспользуя CreateMessageReceiver. Получаетли сообщения могут работать в двух разных режимах: ReceiveAndDelete и PeekLock. Режим ReceiveMode устанавливается, когда получатель сообщения создан в качестве параметра к вызову CreateMessageReceiver.

При использовании режима ReceiveAndDelete получение является единовременной операцией; то есть, когда Служебная шина получает запрос, он помечает сообщение как полученное и возвращает его в приложение. Режим ReceiveAndDelete — это самая простая модель, которая лучше всего подходит для сценариев, в которых приложение может не обрабатывать сообщение при возникновении сбоя. Для понимания этого рассмотрим сценарий, в котором получатель выпускает запрос на получение, а затем ломается до его обработки. Так как Служебная шина пометил сообщение как полученное, когда приложение перезапускается и начинает получать сообщения снова оно пропустит сообщение которое было получено до поломки.

В режиме PeekLock получение становится двух шаговой операцией, которая делает возможным поддерживать приложения, которые не могут обработать отсутствующие сообщения. Когда Служебная шина получает запрос, он находит следующее сообщение, которое необходимо получить, блокирует его, для предотвращения получения его другими получателями, а затем возвращает приложению. После того как приложение заканчивает обработку сообщения (или хранит его для дальнейшей обработки), оно завершает вторую стадию процесса получения путем вызоваComplete по полученному сообщению. Когда Служебная шина видит Complete, он помечает сообщение как полученное.

Возможны два других исхода. Во-первых, если приложение не может обработать сообщение по какой-то причине, оно может вызвать Abandon по полученному сообщению (вместо Complete). Это вызовет Служебная шина для разблокировки сообщения и сделав его доступным для получения, либо тем же потребителем или другим конкурирующим потребителем. Во-вторых, есть таймаут, связанный с блокировкой, и если приложение не может обработать сообщение до того, как истечет время блокировки (например, если приложение терпит сбой), Служебная шина разблокирует сообщение и сделает его доступным для получения.

Есть одна заметка: если приложение ломается после того как обработает сообщение, но перед тем как запрос Complete будет выпущен, сообщение будет заново передано приложению после перезагрузки. Оно часто называется обработкойХотя бы раз Это означает, что каждое сообщение будет обработано как минимум один раз, но в некоторых случаях одно и тоже сообщение может быть отправлено заново. Если сценарий не допускает обработку дубликатов, в приложении требуется дополнительная логика для определения дубликатов. Это может быть достигнуто на основе свойства сообщения MessageId. Значение данного свойства остается постоянным на протяжении всех попыток доставки. Оно часто называется обработкойНепосредственно один раз

Код, показанный здесь иллюстрирует получение и обработку сообщения с использованием режима PeekLock, который является режимом по умолчанию, если явно не указано значения ReceiveMode.

MessageReceiver receiver = factory.CreateMessageReceiver("DataCollectionQueue");
BrokeredMessage receivedMessage = receiver.Receive();
try
{
    ProcessMessage(receivedMessage);
    receivedMessage.Complete();
}
catch (Exception e)
{
    receivedMessage.Abandon();
}

В примерах ранее в данном разделе были созданы объекты MessageSender и MessageReceiver напрямую из MessagingFactory для отправки и получения сообщений из очереди соответственно. Альтернативным подходом является использование класса QueueClient, который поддерживает как отправку, так и получение в дополнение к другим функциям, как, например, сеансы.

QueueClient queueClient = 
     factory.CreateQueueClient("DataCollectionQueue");
queueClient.Send(bm);
            
BrokeredMessage message = queueClient.Receive();
try
{
    ProcessMessage(message);
    message.Complete();
}
catch (Exception e)
{
    message.Abandon();
} 

Показ:
© 2015 Microsoft