Экспорт (0) Печать
Развернуть все
1 из 1 оценили этот материал как полезный - Оценить эту тему

Очереди, разделы и подписки Service Bus

Обновлено: Январь 2014 г.

Шина обслуживания Windows Azure поддерживает ряд облачных промежуточных программных технологий с ориентацией на сообщения, включая надежные очереди сообщений, а также устойчивую публикацию сообщений и подписку на сообщения. Эти "посреднические" возможности системы обмена сообщениями можно рассматривать как асинхронные или несвязанные функции обмена сообщениями, которые поддерживают сценарии публикации-подписки, временного отделения и балансировки нагрузки с использованием структуры обмена сообщениями Service Bus. Несвязанный обмен данными имеет множество преимуществ; например, клиенты и серверы могут подключаться по необходимости и выполнять свои операции в асинхронном режиме.

Ядро новых возможностей обмена сообщениями через посредник в Service Bus составляют три шаблона обмена сообщениями: очереди, темы/подписки и правила/действия.

Очереди

Очереди предлагают доставку сообщений одному или нескольким конкурирующим потребителям по методу FIFO. Это означает, что сообщения обычно получаются и обрабатываются получателями во временном порядке, в котором они добавлялись в очередь, и каждое сообщение получается и обрабатывается только одним потребителем сообщения. Основное преимущество использования очередей состоит в том, что достигается «временная развязка» компонентов приложений. Другими словами, создатели (отправители) и потребители (получатели) не должны отправлять и получать сообщения в одно и то же время, поскольку сообщения долговременно хранятся в очереди. Более того, создатель сообщений не должен ждать ответа от потребителя, чтобы продолжить обработку и отправку сообщений.

С этим связано также преимущество «выравнивания нагрузки», позволяющее производителям и потребителям отправлять и получать сообщения с разной скоростью. Во многих приложениях загрузка системы меняется с течением времени, однако время обработки, необходимое для каждой единицы работы, как правило, постоянно. Включение очереди в качестве посредника между создателями и получателями сообщений означает, что получающее приложение достаточно подготовить, чтобы оно могло выполнять обработку во время средней, а не пиковой загрузки. Глубина очереди будет расти и сокращаться в соответствии с изменением входящей нагрузки. Это напрямую экономит затраты на объем инфраструктуры, необходимой для обслуживания загрузки приложений. По мере увеличения загрузки можно добавлять рабочие процессы для чтения из очереди. Каждое сообщение обрабатывается только одним рабочим процессом. Более того, такая балансировка нагрузки на основе запросов позволяет оптимально использовать рабочие компьютеры, даже если они разные по мощности, так как они будут принимать сообщения в соответствии со своей максимальной скоростью. Этот шаблон часто называется «конкурирующий потребитель».

Использование очередей в качестве посредников между создателями и потребителями сообщений обеспечивает собственную слабую связь между компонентами. Поскольку создатели и потребители не осведомлены друг о друге, обновление потребителя не оказывает никакого влияния на создателя сообщений.

Создание очереди является многоэтапным процессом. Операции управления для сущностей обмена сообщениями в Service Bus (очередей и тем) выполняются посредством класса NamespaceManager, который создается путем предоставления базового адреса пространства имен Service Bus и учетных данных пользователей. NamespaceManager предоставляет методы для создания, перечисления и удаления сущностей обмена сообщениями. После создания объекта TokenProvider из имени издателя, общего ключа и объекта управления пространство имен службы можно воспользоваться методом CreateQueue для создания очереди. Например:

// Create management credentials
TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(IssuerName, IssuerKey);
// Create namespace client
namespaceManager namespaceClient = new namespaceManager(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials);

Затем можно создать объект очереди и фабрику обмена сообщениями, передав в качестве аргумента URI Service Bus. Например:

QueueDescription myQueue;
myQueue = namespaceClient.CreateQueue("TestQueue");
MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials); 
QueueClient myQueueClient = factory.CreateQueueClient("TestQueue");

Затем можно отправлять сообщения в очередь. Например, если имеется список использующих посредника сообщений с именем MessageList, то код может выглядеть следующим образом:

for (int count = 0; count < 6; count++)
{
    var issue = MessageList[count];
    issue.Label = issue.Properties["IssueTitle"].ToString();
    myQueueClient.Send(issue);
}

Получать сообщения из очереди можно следующим образом:

while ((message = myQueueClient.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 5))) != null)
    {
        Console.WriteLine(string.Format("Message received: {0}, {1}, {2}", message.SequenceNumber, message.Label, message.MessageId));
        message.Complete();

        Console.WriteLine("Processing message (sleeping...)");
        Thread.Sleep(1000);
    }

В режиме ReceiveAndDelete операция получения является однократной, т.е. когда Service Bus получает запрос, то помечает сообщение как потребляемое и возвращает его в приложение. Режим ReceiveAndDelete представляет простейшую модель и лучше всего работает в сценариях, в которых приложение допускает отсутствие обработки сообщения в случае сбоя. Для лучшего понимания рассмотрим сценарий, в котором происходит сбой потребителя после выдачи запроса на получение, но до его обработки. Поскольку Service Bus помечает сообщения как потребляемые, когда приложение перезапускается и снова начинает потреблять сообщения, оно пропустит сообщение, полученное до сбоя.

В режиме PeekLock операция получения становится двухстадийной, что позволяет поддерживать приложения, которые не допускают пропущенные сообщения. Когда Service Bus получает запрос, то находит следующее сообщение для потребления, блокирует его от получения другими потребителями, а затем возвращает в приложение. После того как приложение завершит обработку сообщения (или надежно сохранит для будущей обработки), выполняется вторая стадия процесса получения, на которой для полученного сообщения вызывается метод Complete. Когда Service Bus видит метод Complete, то помечает сообщение как потребляемое.

Если приложение по каким-либо причинам не может обработать сообщение, то оно может вызвать для полученного сообщения метод Abandon (вместо Complete). Это приводит к тому, что Service Bus снимает блокировку сообщения и делает его доступным для повторного получения, как тем же потребителем, так и другим конкурирующим потребителем. Кроме того, существует связанное с блокировкой время ожидания, и если приложение неудачно завершит обработку сообщения до истечения этого времени ожидания (например, в случае сбоя приложения), то Service Bus разблокирует сообщение м сделает его доступным для повторного получения.

Следует отметить, что в случае, если сбой приложений произойдет после обработки сообщения, но до выдачи запроса Complete, сообщение будет заново доставлено этому приложению после его перезапуска. Это часто называется обработкой "минимум один раз"; т.е. каждое сообщение будет обрабатываться как минимум один раз, но в некоторых ситуациях это же сообщение может доставляться повторно. Если сценарий не допускает повторной обработки, то в приложениях требуется ввести дополнительную логику для обнаружения повторных сообщений, что можно сделать с помощью свойства сообщения MessageId, которое остается неизменным во всех попытках доставки сообщения. Это называется обработкой "ровно один раз".

Дополнительные сведения, а также рабочий пример создания и отправки сообщений в очереди и из очередей, см. в разделе Учебник по обмену сообщениями .NET через посредника в Service Bus.

Темы и подписки

В отличие от очередей, в которых каждое сообщение потребляется только одним получателем, темы и подписки обеспечивают форму обмена сообщениями «один ко многим» в шаблоне «публикация/подписка». Каждое опубликованное сообщение становится доступным для каждой подписки, зарегистрированной в теме, что удобно при очень большом количестве получателей. Сообщения отправляются в тему и доставляются в одну или несколько соответствующих подписок, в зависимости от правил фильтра, которые можно устанавливать для каждой подписки. В подписках можно использовать дополнительные фильтры, чтобы ограничиваться только сообщениями, которые требуется получать. Сообщения отправляются в тему так же, как в очередь, но получаются из тем не напрямую. Они получаются из подписок. Подписка темы похожа на виртуальную очередь, получающую копии сообщений, отправленных в тему. Из подписки сообщения получаются так же, как из очереди.

Если сравнить, то функциональность отправки сообщений очереди прямо соответствует такой же функциональности темы, а функциональность получения сообщений очереди — такой же функциональности подписки. Кроме всего прочего, это означает, что подписки поддерживают шаблоны, которые рассматривались выше в этом разделе для очередей: конкурирующих потребителей, временную развязку, выравнивание нагрузки и балансировку нагрузки.

Процесс создания темы аналогичен процессу создания очереди, как показано в примере из предыдущего раздела. Сначала создается URI службы, а затем с помощью класса NamespaceManager создается клиент пространства имен. Далее с помощью метода CreateTopic можно создать тему. Например:

TopicDescription dataCollectionTopic = namespaceClient.CreateTopic("DataCollectionTopic");

Затем добавляются требуемые подписки:

SubscriptionDescription myAgentSubscription = namespaceClient.CreateSubscription(myTopic.Path, "Inventory");
SubscriptionDescription myAuditSubscription = namespaceClient.CreateSubscription(myTopic.Path, "Dashboard");

Затем создаются клиенты темы. Например:

MessagingFactory factory = MessagingFactory.Create(serviceUri, tokenProvider);
TopicClient myTopicClient = factory.CreateTopicClient(myTopic.Path)

С помощью отправителя сообщений можно отправлять сообщения в тему и получать их из темы, как показано в предыдущем разделе. Например:

foreach (BrokeredMessage message in messageList)
{
    myTopicClient.Send(message);
    Console.WriteLine(
    string.Format("Message sent: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
}

Как и при использовании очередей, сообщения получаются из подписок с помощью объекта SubscriptionClient вместо QueueClient. Создайте клиента подписки, передав имя темы, имя подписки и (дополнительно) режим получения в качестве параметров. Например, для подписки Inventory:

// Create the subscription client
MessagingFactory factory = MessagingFactory.Create(serviceUri, tokenProvider); 

SubscriptionClient agentSubscriptionClient = factory.CreateSubscriptionClient("IssueTrackingTopic", "Inventory", ReceiveMode.PeekLock);
SubscriptionClient auditSubscriptionClient = factory.CreateSubscriptionClient("IssueTrackingTopic", "Dashboard", ReceiveMode.ReceiveAndDelete); 

while ((message = agentSubscriptionClient.Receive(TimeSpan.FromSeconds(5))) != null)
{
    Console.WriteLine("\nReceiving message from Inventory...");
    Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
    message.Complete();
}          

// Create a receiver using ReceiveAndDelete mode
while ((message = auditSubscriptionClient.Receive(TimeSpan.FromSeconds(5))) != null)
{
    Console.WriteLine("\nReceiving message from Dashboard...");
    Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
}

ImportantВажно!
Как сказано в разделе Инструкция по публикации службы в реестре Service Bus, можно использовать ServiceRegistrySettings, чтобы указать, должна ли служба быть обнаруживаемой в Service Bus. Если служба частная, то к ней могут подключиться только те лица, которым известен конкретный URI. Если она общедоступная, то любой может переходить по иерархии Service Bus и обнаруживать ваш прослушиватель. Однако очереди, темы и подписки не могут предоставляться с помощью реестра службы.

Правила и действия

Во многих сценариях сообщения, имеющие определенные характеристики, должны обрабатываться особым образом. Для этого можно настраивать подписки, чтобы они обнаруживали сообщения с нужными свойствами, а затем выполняли определенные изменения этих свойств. Хотя подписки Service Bus видят все сообщения, отправленные в тему, можно копировать в виртуальную очередь подписки только часть этих сообщений. Это выполняется с помощью фильтров подписок. Такие изменения называются действиями фильтров. При создании подписки можно предоставить выражение фильтра, которое работает со свойствами сообщения, как с системными (например Label), так и со свойствами приложения, например StoreName в предыдущем примере. В этом случае SQL-выражение фильтра не является обязательным; без него все действия фильтра, заданные в подписке, будут выполняться для всех сообщений этой подписке.

Используя предыдущий пример, для фильтрации сообщений, приходящих только от Store1, можно создать подписку Dashboard следующим образом:

namespaceManager.CreateSubscription("IssueTrackingTopic", "Dashboard", new SqlFilter("StoreName = 'Store1'"));

При наличии такого фильтра подписки в виртуальную очередь для подписки Dashboard будут копироваться только сообщения со свойством StoreName, имеющим значение Store1.

Дополнительные сведения о возможных значениях фильтров см. в документации по классам SqlFilter и SqlRuleAction. Кроме того, см. AdvancedFiltersSample в пакете SDK Windows Azure.

См. также

Была ли вам полезна эта информация?
(1500 символов осталось)
Спасибо за ваш отзыв

Добавления сообщества

ДОБАВИТЬ
Корпорация Майкрософт проводит интернет-опрос, чтобы выяснить ваше мнение о веб-сайте MSDN. Если вы желаете принять участие в этом интернет-опросе, он будет отображен при закрытии веб-сайта MSDN.

Вы хотите принять участие?
Показ:
© 2014 Microsoft. Все права защищены.