Экспорт (0) Печать
Развернуть все

Очереди, темы и подписки шины обслуживания

Обновлено: Октябрь 2014 г.

Шина службы Microsoft Azure поддерживает набор облачных, ориентированных на сообщения промежуточных программных технологий, включающий надежные очереди сообщений, а также устойчивую публикацию сообщений и подписку на сообщения. Схема посреднического обмена сообщениями может также рассматриваться как асинхронный или «временно разъединенный» обмен сообщениями с поддержкой публикации, подписки и временного разъединения, а также сценариев балансировки нагрузки с использованием фабрики обмена сообщениями Служебная шина. Несвязанный обмен данными имеет множество преимуществ. Например, клиенты и серверы могут подключаться по мере необходимости и выполнять свои операции асинхронно.

Сущности обмена сообщениями, формирующие ядро возможностей обмена сообщениями через посредников в Служебная шина — это очереди, разделы/ подписки, правила/действия и концентраторы событий.

Очереди обеспечивают доставку сообщений по принципу «первым пришел, первым ушел» (FIFO) одному или нескольким конкурирующим клиентам. Это значит, что сообщения обычно принимаются и обрабатываются получателями в порядке их добавления в очередь, и каждое сообщение принимается и обрабатывается только одним клиентом. Основное преимущество использования очередей состоит в том, что достигается «временная развязка» компонентов приложений. Другими словами, создатели (отправители) и клиенты (получатели) не обязаны отправлять и получать сообщения в одно и то же время, поскольку сообщения хранятся в очереди долговременно. Более того, создателям не нужно дожидаться ответа от клиента для того, чтобы продолжить обрабатывать и отправлять сообщения.

С этим связано также преимущество «выравнивания нагрузки», позволяющее производителям и потребителям отправлять и получать сообщения с разной скоростью. Во многих приложениях системная нагрузка изменяется во времени, а время обработки, требуемое для каждой единицы работы, как правило, постоянно. Посредничество между отправителями и получателями сообщений при помощи очереди означает, что потребляющее приложение необходимо подготовить для обработки только средней, а не пиковой нагрузки. Глубина очереди будет расти или уменьшаться в зависимости от изменения входящей нагрузки. Это напрямую экономит деньги в отношении количества требуемой инфраструктуры на обслуживание нагрузки приложения. Если нагрузка возрастает, можно добавить больше рабочих процессов для чтения данных из очереди. Каждое сообщение обрабатывается только одним рабочим процессом. Более того, данная балансировка нагрузки на основе опросов позволяет оптимально использовать рабочие компьютеры, даже если рабочий компьютер отличается по мощности обработки данных, так как они будут опрашивать сообщения на своей собственной максимальной скорости. Данный шаблон часто называется «конкурирующий клиент».

Использование очереди отправки сообщений для посредничества между отправителями и получателями сообщений обеспечивает свойственную ей свободную связь между компонентами. Так как производители и потребители не знают друг друга, потребитель может обновить терминал без какого-либо влияния на производителя.

Создание очереди — это многоступенчатый процесс. Операции управления для сущностей обмена сообщениями Служебная шина (очередей и разделов) выполняются через класс Microsoft.ServiceBus.NamespaceManager, который конструируется при вводе базового адреса пространства имен Служебная шина и учетных данных пользователя. NamespaceManager предоставляет методы для создания, перечисления и удаления сущностей обмена сообщениями. После создания объекта Microsoft.ServiceBus.TokenProvider из имени поставщика и общего ключа, а также объекта управления пространство имен службы можно использовать метод Microsoft.ServiceBus.NamespaceManager.CreateQueue(Microsoft.ServiceBus.Messaging.QueueDescription) для создания очереди. Например:

// Create management credentials
TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(IssuerName, IssuerKey);
// Create namespace client
namespaceManager namespaceClient = new namespaceManager(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials);

Далее можно создать объект очереди и фабрику обмена сообщениями с URI Служебная шина в качестве аргумента. Например:

QueueDescription myQueue;
myQueue = namespaceClient.CreateQueue("TestQueue");
MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials); 
QueueClient myQueueClient = factory.CreateQueueClient("TestQueue");

После этого можно отправлять сообщения в очередь. Например, если имеется список сообщений, поступивших через посредников, с именем MessageList, код будет иметь следующий вид:

for (int count = 0; count < 6; count++)
{
    var issue = MessageList[count];
    issue.Label = issue.Properties["IssueTitle"].ToString();
    myQueueClient.Send(issue);
}

Сообщения из очереди можно получать следующим образом:

while ((message = myQueueClient.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 5))) != null)
    {
        Console.WriteLine(string.Format("Message received: {0}, {1}, {2}", message.SequenceNumber, message.Label, message.MessageId));
        message.Complete();

        Console.WriteLine("Processing message (sleeping...)");
        Thread.Sleep(1000);
    }

При использовании режима ReceiveAndDelete получение является единовременной операцией. То есть, когда Служебная шина получает запрос, он помечает сообщение как полученное и возвращает его в приложение. Режим ReceiveAndDelete — это самая простая модель, наиболее подходящая для сценариев, в которых приложение может не обрабатывать сообщение при возникновении сбоя. Для понимания этого рассмотрим сценарий, в котором получатель выпускает запрос на получение, а затем ломается до его обработки. Так как Служебная шина пометил сообщение как полученное, когда приложение перезапустится и снова начнет получать сообщения, оно пропустит сообщение, которое было получено до сбоя.

В режиме PeekLock получение становится двухшаговой операцией, позволяющей поддерживать приложения, которые не могут обрабатывать пропущенные сообщения. Когда Служебная шина получает запрос, он находит следующее сообщение, которое необходимо получить, блокирует его, чтобы предотвратить его получение другими клиентами, а затем возвращает приложению. После того как приложение заканчивает обработку сообщения (или хранит его для дальнейшей обработки), оно завершает вторую стадию процесса получения путем вызоваMicrosoft.ServiceBus.Messaging.BrokeredMessage.Complete по полученному сообщению. Когда Служебная шина видит Complete, он помечает сообщение, как полученное.

Если приложение не может обработать сообщение по какой-то причине, оно может вызвать метод Microsoft.ServiceBus.Messaging.BrokeredMessage.Abandon по полученному сообщению (вместо Complete). При этом вызывается Служебная шина, который разблокирует сообщение и делает его доступным для получения тем же или другим, конкурирующим, клиентом. Во-вторых, есть таймаут, связанный с блокировкой, и если приложение не может обработать сообщение до того, как истечет время блокировки (например в случае сбоя приложения), Служебная шина разблокирует сообщение и снова сделает его доступным для получения.

Обратите внимание, что при сбое приложения после обработки сообщения, но до формирования запроса Complete сообщение повторно доставляется приложению после его перезапуска. Эта операция часто называется обработкой «Как минимум один раз», т. е. каждое сообщение обрабатывается хотя бы один раз. Однако в определенных ситуациях то же самое сообщение может доставляться повторно. Если сценарий не допускает обработку дубликатов, в приложении требуется дополнительная логика для определения дубликатов. Этого можно достичь с помощью свойства MessageId сообщения. Значение этого свойства остается постоянным на протяжении всех попыток доставки. Оно часто называется обработкой «Только один раз».

Дополнительную информацию и рабочий пример создания и отправки сообщений в очереди и из них см. в разделе Учебник по обмену сообщениями .NET через посредника в Service Bus.

В отличие от очередей, в которых сообщение обрабатывается одним клиентом, разделы и подписки обеспечивают связь в формате «один ко многим» по шаблону «опубликовать или подписаться». Это удобно при масштабировании для работы с очень большим количеством получателей. При этом каждое опубликованное сообщение становится доступным для любой подписки, зарегистрированной в разделе. Сообщения отправляются теме и доставляются одной или нескольким связанным с ней подпискам в зависимости от правил фильтра, которые можно задавать для каждой подписки отдельно. Подписки могут использовать дополнительные фильтры для ограничения сообщений, которые они будут получать. Сообщения отправляются в раздел точно так же, как и в очередь, но не принимаются из раздела напрямую, а поступают из подписок. Подписка раздела напоминает виртуальную очередь, которая получает копии отправленных в раздел сообщений. Из подписки сообщения принимаются точно так же, как из очереди.

Для сравнения, функция очереди по отправке сообщений сопоставляется напрямую с разделом, а ее функция приема сообщений — с подпиской. Помимо прочего, это означает, что подписки поддерживают те же шаблоны, которые были описаны в данном разделе при рассмотрении очередей: конкурирующий клиент, временное разъединение, выравнивание и балансировка нагрузки.

Создание разделов аналогично созданию очередей, как показано в примере предыдущего раздела. Создайте URI службы, а затем используйте класс NamespaceManager для создания клиента пространства имен. Далее можно создать раздел с использованием метода Microsoft.ServiceBus.NamespaceManager.CreateTopic(System.String). Например:

TopicDescription dataCollectionTopic = namespaceClient.CreateTopic("DataCollectionTopic");

После этого добавьте необходимые подписки:

SubscriptionDescription myAgentSubscription = namespaceClient.CreateSubscription(myTopic.Path, "Inventory");
SubscriptionDescription myAuditSubscription = namespaceClient.CreateSubscription(myTopic.Path, "Dashboard");

Затем создайте клиент раздела. Например:

MessagingFactory factory = MessagingFactory.Create(serviceUri, tokenProvider);
TopicClient myTopicClient = factory.CreateTopicClient(myTopic.Path)

С помощью отправителя сообщений можно отправлять и получать сообщения в раздел и из него, как показано в предыдущем разделе. Например:

foreach (BrokeredMessage message in messageList)
{
    myTopicClient.Send(message);
    Console.WriteLine(
    string.Format("Message sent: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
}

Подобно очередям, сообщения принимаются из подписки с помощью объекта SubscriptionClient вместо объекта QueueClient. Создайте клиент подписки, передав имя раздела, имя подписки и (необязательно) режим получения в виде параметров. Например, с подпиской Запасы:

// Create the subscription client
MessagingFactory factory = MessagingFactory.Create(serviceUri, tokenProvider); 

SubscriptionClient agentSubscriptionClient = factory.CreateSubscriptionClient("IssueTrackingTopic", "Inventory", ReceiveMode.PeekLock);
SubscriptionClient auditSubscriptionClient = factory.CreateSubscriptionClient("IssueTrackingTopic", "Dashboard", ReceiveMode.ReceiveAndDelete); 

while ((message = agentSubscriptionClient.Receive(TimeSpan.FromSeconds(5))) != null)
{
    Console.WriteLine("\nReceiving message from Inventory...");
    Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
    message.Complete();
}          

// Create a receiver using ReceiveAndDelete mode
while ((message = auditSubscriptionClient.Receive(TimeSpan.FromSeconds(5))) != null)
{
    Console.WriteLine("\nReceiving message from Dashboard...");
    Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
}

ImportantВажно!
Как указано в разделе Инструкция по публикации службы в реестре Service Bus, параметры Microsoft.ServiceBus.ServiceRegistrySettings позволяют указать, будет ли вашу службу обнаруживать Служебная шина. Если служба частная, то подключаться смогут только пользователи, которым известен соответствующий URI. Если служба общедоступная, то просматривать иерархию Служебная шина и находить ваш прослушиватель сможет любой. В то же время очереди, разделы и подписки не могут предоставляться через реестр службы.

Во многих сценариях сообщения с определенными характеристиками должны обрабатываться определенным образом. Для этого можно настроить подписки на поиск сообщений с нужными свойствами, а затем внести определенные изменения в эти свойства. Подписки Служебная шина видят все сообщения, отправленные в раздел, однако, копировать в виртуальную очередь подписки можно только часть этих сообщений. Это делается с помощью фильтров подписок. Такие изменения называются действиями фильтра. При создании подписки можно ввести выражение фильтра, которое работает со свойствами сообщения — как с системными свойствами (например, Label), так и со свойствами приложения, такими как StoreName в предыдущем примере. В этом случае использовать SQL-критерий фильтра необязательно. Без этого критерия любое действие фильтра, заданное для подписки, будет выполняться для всех сообщений этой подписки.

Чтобы в предыдущем примере отфильтровать получение сообщений только из Store1, необходимо создать подписку панели мониторинга следующим образом:

namespaceManager.CreateSubscription("IssueTrackingTopic", "Dashboard", new SqlFilter("StoreName = 'Store1'"));

При наличии такого фильтра подписки только сообщения со свойством StoreName, имеющим значение Store1, будут копироваться в виртуальную очередь для подписки Панель мониторинга.

Допустимые значения фильтров Дополнительные сведения см. в документации для классов Microsoft.ServiceBus.Messaging.SqlFilter и Microsoft.ServiceBus.Messaging.SqlRuleAction. Также см. пример в разделе Обмен сообщениями через посредника: расширенные фильтры.

Концентраторы событий — это служба сбора событий. Они используются для обеспечения входа событий и телеметрии в Azure в большом масштабе, при низкой задержке и с высокой надежностью. Эта служба при использовании вместе с другими подчиненными службами особенно полезна в сценариях инструментирования приложений, обработки работы пользователя или рабочих процессов, а также «Интернета вещей» (IoT).

Концентраторы событий представляют собой конструкцию передачи сообщений, но, несмотря на схожесть с очередями и разделами, обладают совершенном иными характеристиками. Например, концентраторы событий не обеспечивают сообщения TTL, отправку в мертвые очереди, транзакции или подтверждения, поскольку это — традиционные функции обмена сообщениями через посредников, а не функции потоковой передачи. Концентраторы событий предлагают другие связанные с потоком функции, такие как секционирование, сохранение порядка и воспроизведение потока.

См. также

Показ:
© 2014 Microsoft