Экспорт (0) Печать
Развернуть все
Развернуть Свернуть

Рекомендации по достижению максимальной масштабируемости и экономической эффективности решений с обменом сообщениями на основе очередей в Windows Azure

Обновлено: Июль 2014 г.

Автор: Валерий Мизонов (Valery Mizonov)

Проверен: Брэд Кэлдер (Brad Calder), Сидни Хига (Sidney Higa), Кристиан Мартинес (Christian Martinez), Стив Маркс (Steve Marx), Курт Петерсон (Curt Peterson), Паоло Сальватори (Paolo Salvatori) и Трейс Янг (Trace Young)

В данной статье рассматриваются конкретные советы и рекомендации по построению масштабируемых, эффективных и экономичных решений на основе очереди сообщений на платформе Windows Azure. Целевая аудитория данной статьи включает архитекторов решений и разработчиков, которые занимаются созданием и внедрением облачных решений с использованием служб хранения очередей на платформе Windows Azure.

В традиционных решениях на основе очередей используется концепция хранения сообщений в очереди сообщений, которая является репозиторием для данных, отправляемых одному или нескольким участникам или получаемых от них, как правило, через механизм асинхронной передачи данных.

Обмен данными на основе очередей является основой надежной и масштабируемой архитектуры передачи сообщений, которая способна поддерживать множество эффективных сценариев в распределенной вычислительной среде. Технология очередей сообщений может стать отличным решением для масштабной и надежной диспетчеризации и предоставить великолепные возможности для решения различных задач асинхронного обмена данными любого масштаба.

Цель данной статьи — показать, как разработчик может воспользоваться преимуществами конкретных шаблонов разработки совместно с возможностями платформы Windows Azure в построении оптимизированных и экономичных решений обработки сообщений на основе очередей. Данная статья дает более детальное представление о большинстве часто используемых подходов реализации взаимодействия на основе очередей в решениях Windows Azure и содержит рекомендации по повышению производительности, масштабируемости и сокращению операционных расходов.

Основное обсуждение снабжено сопутствующими рекомендациями и советами. В сценарии, приведенном в данной статье, основное внимание уделяется технической реализации, основанной на реальном клиентском проекте.

Чтобы предоставить конкретный пример, обобщим реальный клиентский сценарий следующим образом.

Поставщик решений SaaS запускает новую систему выставления счетов, реализованную в виде приложения Windows Azure, которая будет обслуживать клиентские транзакции в любом масштабе. Основная предпосылка этого решения заключается в возможности передачи интенсивных вычислительных рабочих нагрузок в облако и задействования эластичности инфраструктуры Windows Azure для выполнения трудоемких вычислительных операций.

Локальный элемент сквозной инфраструктуры периодически в течение дня производит консолидацию и диспетчеризацию больших объемов транзакций в службу, размещенную в Windows Azure. Объемы данных могут составлять от тысячи до сотен тысяч транзакций за один сеанс, достигая миллиона транзакций в день. Кроме того, предположим, что решение должно удовлетворять требованиям соглашения об уровне обслуживания и обеспечивать гарантированную максимальную длительность обработки.

Архитектура решения основана на распределенном шаблоне разработки map-reduce и состоит из основанного на многоэкземплярной рабочей роли облачного слоя, использующего хранилище очередей Windows Azure для диспетчеризации работы. Пакеты транзакций передаются экземпляру рабочей роли Process Initiator, разбиваются на более мелкие элементы и помещаются в коллекцию очередей Windows Azure для распределения нагрузки.

Рабочая нагрузка обрабатывается несколькими экземплярами рабочих ролей, которые извлекают рабочие элементы из очередей и пропускают их через вычислительные процедуры. Экземпляры обработчиков реализуют параллельную обработку данных с помощью многопоточных прослушивателей очередей, чтобы обеспечить максимальную производительность.

Обработанные элементы затем маршрутизируются в выделенную очередь, откуда экземпляр рабочей роли Process Controller их забирает, агрегирует и сохраняет в хранилище данных для интеллектуального анализа и построения отчетов.

Архитектуру решения можно обозначить следующим образом.

Приведенная выше схема показывает типичную архитектуру для масштабирования больших или сложных вычислительных рабочих нагрузок. Используемый этой архитектурой шаблон обмена сообщениями на основе очередей также весьма типичен для многих других приложений и служб Windows Azure, которые обмениваются данными друг с другом через очереди. Это позволяет применить канонический подход для проверки отдельных фундаментальных компонентов, участвующих в обмене сообщениями на основе очередей.

Типичное решение для обмена сообщениями, которое обеспечивает обмен данными между собственными распределенными компонентами с использованием очередей, включает издателей, помещающих сообщения в очереди, и одного или нескольких подписчиков, которые получают эти сообщения. В большинстве случаев подписчики, которые иногда называются прослушивателями очереди, реализованы в виде однопоточных или многопоточных процессов, либо работающих непрерывно, либо запускаемых по запросу (по расписанию).

На более высоком уровне имеется два основных механизма диспетчеризации, используемые для получения прослушивателями очереди сообщений, сохраненных в очереди.

  • Опрос (модель на основе опроса): прослушиватель производит мониторинг очереди, регулярно проверяя ее на наличие новых сообщений. Если очередь пуста, прослушиватель продолжает опрашивать ее, периодически переходя в режим сна.

  • Активация (модель на основе принудительной передачи): прослушиватель подписывается на событие, которое вызывается либо самим издателем, либо диспетчером службы очередей при поступлении сообщения в очередь. Прослушиватель, в свою очередь, может инициировать обработку сообщения. Таким образом, исключается необходимость в опросе очереди в ожидании новой работы.

Следует также отметить, что каждый из этих механизмов имеет свои нюансы. Например, опрос очереди может быть блокируемым и неблокируемым. Блокировка оставляет запрос в ожидании до тех пор, пока новое сообщение не появится в очереди (либо не истечет время ожидания), а неблокируемый запрос, если в очереди ничего нет, завершается немедленно. В модели активации уведомление может отправляться прослушивателям очереди либо при поступлении каждого нового сообщения, либо при поступлении первого сообщения в пустую очередь, либо когда в очереди накапливается определенное число сообщений.

noteПримечание
Операции вывода из очереди, поддерживаемые API службы очередей Windows Azure, являются неблокируемыми. Это означает, что такие методы API, как GetMessage или GetMessages, будут завершаться немедленно, если в очереди нет сообщений. Очереди Windows Azure Service Bus, напротив, предусматривают блокирующие операции получения, которые останавливают вызывающий поток до тех пор, пока в очереди не появится сообщение или не истечет заданное время ожидания.

Подходы, которые сегодня чаще всего используются при реализации прослушивателей очередей в решениях Windows Azure, можно обобщить следующим образом.

  1. Прослушиватель реализуется как компонент приложения. Создаваемые экземпляры прослушивателя выполняются в рамках экземпляра рабочей роли.

  2. Жизненный цикл компонента прослушивателя очереди будет часто привязан к времени запуска экземпляра роли размещения.

  3. Основная логика обработки заключается в цикле, в котором сообщения выводятся из очереди и диспетчеризуются для обработки.

  4. Когда нет полученных сообщений, прослушивающий поток переходит в ждущий режим на время, которое обычно задается алгоритмом отсрочки, реализованным в приложении.

  5. В цикле получения опрос очереди производится до тех пор, пока прослушиватель не будет уведомлен о том, что нужно выйти из цикла и завершить работу.

Следующая блок-схема показывает логику, чаще всего применяемую при реализации прослушивателя очередей с механизмом опроса в приложениях Windows Azure.

Рекомендации-по-решениям-для-обмена-сообщениями-Azure2
noteПримечание
В этой статье не будут рассматриваться более сложные шаблоны проектирования, например использующие центральный диспетчер очередей.

Использование классического прослушивателя очередей с механизмом опроса может оказаться неоптимальным выбором при работе с очередями Windows Azure, так как модель ценообразования Windows Azure учитывает транзакции хранилища в запросах приложения, выполняемых применительно к очереди, независимо от того, пуста ли очередь. В следующих разделах будут обсуждаться некоторые методики повышения производительности и сокращения стоимости решений обработки сообщений на основе очередей на платформе Windows Azure.

В этом разделе мы рассмотрим, как улучшить соответствующие аспекты проекта, чтобы добиться более высокой производительности, масштабируемости и экономичности.

Возможно, простейший путь сделать шаблон проектирования «более эффективным решением» лежит через достижение следующих целей.

  • Сокращение операционных расходов за счет исключения значительной части транзакций в хранилище данных, которые не выполняют полезной работы.

  • Исключение чрезмерных задержек, связанных с интервалом опроса при проверке новых сообщений в очереди.

  • Динамическое вертикальное масштабирование за счет привлечения вычислительных мощностей для изменяющихся объемов обрабатываемых данных.

Кроме того, шаблон реализации должен выполнять перечисленные задачи без кардинального повышения сложности, поскольку связанные с ним издержки могут перевесить получаемые преимущества.

При расчете общей стоимости владения и рентабельности для решения, развертываемого на платформе Windows Azure, объем транзакций данных — одна из основных переменных в уравнении. Сокращение числа транзакций для очереди Windows Azure уменьшает эксплуатационные расходы в отношении запуска решений на платформе Windows Azure.

В контексте решения по обработке сообщений на основе очередей объем транзакций в хранилище можно сократить путем применения сочетания следующих методов.

  1. При помещении сообщений в очередь группируйте связанные сообщения в более крупные пакеты, сжимайте, сохраняйте сжатый образ в хранилище больших двоичных объектов, а в очередь передавайте только ссылку на большой двоичный объект, содержащий реальные данные.

  2. При получении сообщений из очереди объединяйте несколько сообщений в одну транзакцию в хранилище. Метод GetMessages в API службы очередей позволяет извлекать заданное число сообщений в рамках одной транзакции (см. примечание ниже).

  3. При проверке наличия элементов в очереди избегайте использования агрессивных интервалов опроса и реализуйте алгоритм отсрочки, который будет увеличивать время между опросами, если очередь остается пустой длительное время.

  4. Сократите число прослушивателей очередей — при использовании модели на основе опросов создавайте только один прослушиватель на экземпляр роли, если очередь пуста. Чтобы сократить число прослушивателей каждого экземпляра роли до нуля, пользуйтесь механизмом уведомлений: это даст возможность создавать прослушиватели только тогда, когда в очередь поступают элементы.

  5. Если очереди большую часть времени пусты, автоматически снижайте число экземпляров роли, продолжая мониторинг соответствующих метрик системы, чтобы определить момент, когда будет необходимо увеличить число экземпляров для обработки возросшей рабочей нагрузки.

Большинство из вышеприведенных рекомендаций можно применить в базовой реализации, которая будет обрабатывать пакеты сообщений и инкапсулировать множество базовых операций по работе с очередями, большими двоичными объектами и потоками. Ниже в этой статье мы рассмотрим, как это сделать.

ImportantВажно!
При получении сообщений через метод GetMessages максимальный размер пакета (получаемого из очереди в составе одной операции вывода из очереди), поддерживаемый API службы очередей, составляет 32.

В общем случае стоимость транзакции очереди Windows Azure линейно возрастает по мере увеличения числа клиентов службы, например при увеличении числа экземпляров роли или потоков вывода элементов из очереди. Чтобы проиллюстрировать возможные затраты, которые может повлечь использование проекта решения, не учитывающего вышеприведенные рекомендации, мы рассмотрим пример с конкретными цифрами.

Если архитектор решений не применит соответствующие оптимизации, то архитектура системы выставления счетов, описанной выше, скорее всего, вызовет чрезмерные операционные расходы после развертывания и запуска решения на платформе Windows Azure. Причины возможных чрезмерных расходов описаны в данном разделе.

Как отмечалось в определении сценария, бизнес-данные транзакции поступают через регулярные интервалы времени. Предположим, что решение занимается обработкой рабочей нагрузки всего 25 % времени в течение стандартного восьмичасового рабочего дня. Это приводит к тому, что в течение 6 часов (8 часов * 75 %) система простаивает из-за отсутствия транзакций. Более того, решение не будет получать никаких данных в течение 16 часов в нерабочие дни.

Все это время простоя — в общей сложности 22 часа — решение будет продолжать отправлять запросы на вывод данных из очереди, не зная, когда поступят новые данные. Во время этого временного окна каждый поток, выполняющий вывод из очереди, выполнит до 79 200 транзакций (22 часа * 60 мин * 60 транзакций/мин), если интервал опроса установлен в 1 секунду.

Как упоминалось выше, модель ценообразования на платформе Windows Azure рассчитывается по отдельным «транзакциям в хранилище». Транзакция в хранилище — это запрос пользовательского приложения на добавление, считывание, обновление или удаление данных в хранилище. На момент написания данного технического документа транзакция в хранилище исчисляется по ставке 0,01 долл. США за 10 000 транзакций (без учета рекламных предложений и специальных соглашений).

ImportantВажно!
При подсчете числа транзакций очередей учитывайте, что помещение одного сообщения в очередь считается одной транзакцией, при этом извлечение сообщения — это обычно двухшаговый процесс, включающий извлечение, за которым следует запрос на удаление сообщения из очереди. В результате этого каждая успешная операция вывода из очереди состоит из двух транзакций в хранилище. Учтите, что даже если запрос на вывод из очереди не вернул результатов, то он все равно учитывается как оплачиваемая транзакция.

Транзакции в хранилище, создаваемые одним потоком вывода из очереди в вышеописанном сценарии, добавят к месячному счету примерно 2,38 долл. США (79 200 / 10 000 * 0,01 долл. * 30 дней). Для сравнения: 200 потоков вывода из очереди (или по одному потоку на каждый из 200 экземпляров рабочей роли) повысят стоимость до 457,20 долл. США в месяц. Это расходы только на проверку очередей в ожидании работы, а не на выполнение самой вычислительной работы. Вышеприведенный пример является абстрактным, поскольку никто не станет реализовывать свою службу таким образом. Вот почему так важно использовать оптимизации, описанные выше.

Одним из подходов оптимизации производительности решений по обработке сообщений Windows Azure на основе очередей является использование слоя обмена сообщениями публикации-подписки, предоставляемого с шиной обслуживания Windows Azure, как описано в этом разделе.

При таком подходе разработчику необходимо сосредоточиться на создании сочетания опросов и уведомлений, отправляемых в реальном времени. Это позволит прослушивателю подписаться на событие уведомления (триггер), которое вызывается при возникновении определенных условий, указывающих на появление в очереди новой рабочей нагрузки. Этот подход дополняет традиционный цикл опроса слоем публикации-подписки для отправки уведомлений.

В сложной распределенной системе при использовании такого подхода может возникнуть необходимость применения «шины сообщений» или «промежуточного слоя, ориентированного на сообщения», чтобы обеспечить надежную доставку уведомлений одному или нескольким слабосвязанным подписчикам. Windows Azure Service Bus является естественным выбором при решении вопросов доставки сообщений между слабосвязанными распределенными службами приложения, работающими на платформе Windows Azure и выполняющимися локально. Этот подход также прекрасно подходит для архитектуры «шина сообщений», которая позволяет обмениваться уведомлениями между процессами, участвующими в обмене данными на основе очередей.

В процессе, задействованном в обмене сообщениями на основе очередей, может использоваться следующий шаблон.

Лучшие-решения-передачи-сообщений-Azure3

В частности, поскольку тут осуществляется взаимодействие между издателями и подписчиками службы очередей, те же принципы, которые используются при обмене данными между экземплярами ролей Windows Azure, удовлетворят большинству основных требований, относящихся к обмену сообщениями уведомления с принудительной отправкой. Эти принципы уже раскрывались в разделе Как упростить и масштабировать обмен данными между ролями с помощью шины обслуживания Windows Azure.

ImportantВажно!
Использование Windows Azure Service Bus регулируется моделью ценообразования, которая учитывает объем операций по обмену сообщениями для отдельной сущности Service Bus, например очереди или раздела.

Таким образом, важно провести анализ затрат, взвесив все за и против, прежде чем применять Service Bus в данной архитектуре. Аналогично стоит оценить, приведет ли привлечение слоя диспетчеризации уведомлений на основе шины обслуживания Service Bus к сокращению затрат, которое может оправдать дополнительные вложения и труд разработчиков.

Дополнительные сведения о модели ценообразования для Service Bus см. в соответствующих разделах ответов на часто задаваемые вопросы по платформе Windows Azure.

Если проблему задержек довольно просто решить с введением слоя обмена сообщениями публикации-подписки, то дополнительное сокращение затрат можно реализовать за счет применения динамического (эластичного) масштабирования, как описано в следующем разделе.

Платформа Windows Azure реализует для клиента возможность быстрого и простого масштабирования вверх и вниз. Возможность адаптации к меняющейся рабочей нагрузке и переменному трафику — одно из ключевых преимуществ облачной платформы. Это означает, что слово «масштабируемость» можно вычеркнуть из словаря дорогих ИТ-решений, теперь это готовая возможность, которая может быть программным образом включена по запросу в хорошо спроектированном облачном решении.

Динамическое масштабирование — это техническая способность данного решения адаптироваться к плавающим рабочим нагрузкам за счет увеличения и снижения рабочих мощностей и времени обработки на стадии выполнения. Платформа Windows Azure имеет собственную поддержку динамического масштабирования через провизионирование распределенной вычислительной инфраструктуры, на которой вычислительное время можно докупать по необходимости.

Важно различать два типа динамического масштабирования на платформе Windows Azure.

  • Масштабирование экземпляров ролей означает добавление или исключение дополнительных экземпляров веб-ролей или рабочих ролей для обработки текущей рабочей нагрузки. Оно часто заключается в изменении числа экземпляров в конфигурации службы. При увеличении числа экземпляров среда выполнения Windows Azure запускает новые экземпляры, при их уменьшении лишние экземпляры завершаются.

  • Масштабирование процессов (потоков) указывает на поддержание достаточной мощности в терминах потоков обработки для данного экземпляра роли за счет запуска и завершения потоков в зависимости от текущей рабочей нагрузки.

Динамическое масштабирование в решении обработки сообщений на основе очередей должно предусматривать сочетание следующих общих рекомендаций.

  1. Мониторинг ключевых показателей эффективности, включая загрузку ЦП, размер очереди, время ответа и задержку при обработке сообщений.

  2. Динамическое изменение числа экземпляров роли для обработки всплесков рабочей нагрузки (как предсказуемых, так и непредвиденных).

  3. Программное расширение и усечение числа потоков обработки для адаптации к переменной нагрузке, обрабатываемой данным экземпляром роли.

  4. Секционирование и параллельная обработка тонкогранулированных рабочих нагрузок с помощью библиотеки Task Parallel Library в .NET Framework 4.

  5. Поддержание достаточной мощности — в решениях с быстро изменяющейся рабочей нагрузкой, чтобы упредить внезапные всплески, необходимо уметь обрабатывать их без дополнительных издержек, связанных с созданием дополнительных экземпляров.

API управления службой позволяет службе, размещенной в Windows Azure, изменять число запущенных экземпляров роли путем изменения конфигурации развертывания во время выполнения.

noteПримечание
Максимальное число малых экземпляров COMPUTE Windows Azure (или эквивалентное число экземпляров другого размера в пересчете на число ядер) по умолчанию в обычной подписке ограничивается 20. Все запросы на увеличение этой квоты следует направлять группе поддержки Windows Azure. Дополнительные сведения см. в ответах на часто задаваемые вопросы по платформе Windows Azure.

Динамическое масштабирование числа экземпляров роли не всегда может быть самым подходящим вариантом для обработки пиков нагрузки. Например, новый экземпляр роли может потребовать для запуска несколько секунд, и в настоящее время не существует метрик соглашения об уровне обслуживания, способных регулировать время запуска. Вместо этого решение может просто увеличить число рабочих потоков, чтобы обеспечить обработку возросшего объема нестабильной рабочей нагрузки. Пока данные рабочей нагрузки обрабатываются, решение следит за метриками загрузки и решает, нужно динамически увеличить или уменьшить число рабочих процессов.

ImportantВажно!
В настоящее время цель масштабируемости для одной очереди Windows Azure ограничена до 500 транзакций/сек. Если приложение пытается превысить этот лимит, например выполняя операции с очередью из нескольких экземпляров ролей, обрабатывающих тысячи потоков вывода из очереди, то служба хранилища может выдать ответ HTTP 503 «Сервер занят». В таком случае приложение должно реализовать механизм повтора с алгоритмом экспоненциальной отсрочки. Но если ошибка HTTP 503 возникает регулярно, рекомендуется использовать несколько очередей и реализовать стратегию на основе секционирования, которая будет выполнять масштабирование по нескольким очередям.

В большинстве случаев за автомасштабирование рабочих процессов отвечает конкретный экземпляр роли. В противоположность этому масштабирование экземпляров ролей часто подразумевает наличие центрального элемента архитектуры решения, который отвечает за мониторинг метрик производительности и соответствующие действия по масштабированию. На приведенной ниже схеме показан компонент службы под названием Агент динамического масштабирования, который собирает и анализирует метрики нагрузки и на основе этих данных провизионирует новые экземпляры или завершает простаивающие.

Лучшие-решения-передачи-сообщений-Azure4

Стоит отметить, что служба агента масштабирования может быть развернута либо как рабочая роль на платформе Windows Azure, либо как локальная служба. Независимо от топологии развертывания службе будут доступны очереди Windows Azure.

Чтобы реализовать возможность динамического масштабирования, попробуйте воспользоваться блоком автомасштабирования приложений библиотеки Microsoft Enterprise Library, которая обеспечивает автоматическое масштабирование в решениях Windows Azure. Блок автомасштабирования приложений содержит всю функциональность, необходимую для определения и мониторинга автомасштабирования в приложении Windows Azure.

Теперь, когда мы рассмотрели влияние задержек, стоимость транзакций в хранилище и требования к динамическому масштабированию, пора консолидировать эти рекомендации в техническую реализацию.

В предыдущих разделах мы рассмотрели ключевые характеристики, присущие хорошо спроектированной архитектуре передачи сообщений на основе служб хранилища очередей Windows Azure. Были также рассмотрены три основные области, на которые следует обратить внимание, поскольку они могут помочь сократить задержки в обработке, оптимизировать затраты на транзакции в хранилище и улучшить обработку изменяющейся рабочей нагрузки.

Этот раздел предполагается как отправная точка для разработчиков Windows Azure, которая поможет им в реализации шаблонов, упомянутых в данном техническом документе, с точки зрения программиста.

noteПримечание
В этом разделе также основное внимание уделяется построению автомасштабируемого прослушивателя очереди, который поддерживает как модель на основе опросов, так и модель на основе принудительной отправки. Дополнительные методики динамического масштабирования на уровне экземпляров ролей см. в разделе Блок автомасштабирования приложений библиотеки Enterprise Library.

Кроме того, для краткости в нижеприведенных примерах мы рассмотрим только некоторые базовые функциональные элементы, исключив ненужные сложности и большую часть образца кода инфраструктуры поддержки. Для ясности стоило бы отметить, что техническая реализация, обсуждаемая ниже, не является единственным решением данного круга проблем. Она приведена только для общего понимания, чтобы разработчик на ее основе мог создавать более элегантные решения.

Далее в техническом документе рассматривается главным образом исходный код, необходимый для реализации шаблонов, обсуждавшихся выше.

Сначала определим контракт, который будет реализован в компоненте прослушивателя очереди, размещаемого в рабочей роли очереди Windows Azure.

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension
{
    /// Starts a multi-threaded queue listener that uses the specified number of dequeue threads.
    void StartListener(int threadCount);

    /// Returns the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo QueryState();

    /// Gets or sets the batch size when performing dequeue operation against a Windows Azure queue.
    int DequeueBatchSize { get; set; }

    /// Gets or sets the default interval that defines how long a queue listener will be idle for between polling a queue.
    TimeSpan DequeueInterval { get; set; }

    /// Defines a callback delegate which will be invoked whenever the queue is empty.
    event WorkCompletedDelegate QueueEmpty;
}

Для размещения будет использоваться событие QueueEmpty. Оно содержит механизм, позволяющий узлу управлять работой прослушивателя очереди, когда очередь пуста. Соответствующий делегат события определен следующим образом.

/// <summary>
/// Defines a callback delegate which will be invoked whenever an unit of work has been completed and the worker is
/// requesting further instructions as to next steps.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="idleCount">The value indicating how many times the worker has been idle.</param>
/// <param name="delay">Time interval during which the worker is instructed to sleep before performing next unit of work.</param>
/// <returns>A flag indicating that the worker should stop processing any further units of work and must terminate.</returns>
public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);

Обработка элементов очереди будет проще, если прослушиватель может работать с универсальными типами в противоположность использованию таких «низкоуровневых» классов пакета SDK, как CloudQueueMessage. Поэтому определим новый интерфейс, который будет реализован прослушивателем очереди с поддержкой доступа к очередям на основе универсальных типов.

/// <summary>
/// Defines a contract that must be supported by an extension that implements a generics-aware queue listener.
/// </summary>
/// <typeparam name="T">The type of queue item data that will be handled by the queue listener.</typeparam>
public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T>
{
}

Обратите внимание, что мы также включили прослушиватель с поддержкой универсальных типов для отправки элементов очереди одному или нескольким подписчикам через реализацию шаблона разработки наблюдателя, задействовав интерфейс IObservable<T> из .NET Framework 4.

Мы намерены хранить один экземпляр компонента, реализующего интерфейс ICloudQueueListenerExtension<T>. Однако нам нужна возможность запуска нескольких потоков вывода из очереди (рабочих процессов, или, упрощенно говоря, задач). Поэтому добавим поддержку для многопоточной логики вывода из очереди в компонент прослушивателя очереди. Тут можно воспользоваться библиотекой Task Parallel Library (TPL). Метод StartListener будет отвечать за запуск указанного числа потоков вывода из очереди следующим образом.


/// <summary>
/// Starts the specified number of dequeue tasks.
/// </summary>
/// <param name="threadCount">The number of dequeue tasks.</param>
public void StartListener(int threadCount)
{
    Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");

    // The collection of dequeue tasks needs to be reset on each call to this method.
    if (this.dequeueTasks.IsAddingCompleted)
    {
        this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList);
    }

    for (int i = 0; i < threadCount; i++)
    {
        CancellationToken cancellationToken = this.cancellationSignal.Token;
        CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage);

        // Start a new dequeue task and register it in the collection of tasks internally managed by this component.
        this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
    }

    // Mark this collection as not accepting any more additions.
    this.dequeueTasks.CompleteAdding();
}

Метод DequeueTaskMain реализует функциональное тело потока вывода из очереди. Он выполняет следующие основные операции.

/// <summary>
/// Implements a task performing dequeue operations against a given Windows Azure queue.
/// </summary>
/// <param name="state">An object containing data to be used by the task.</param>
private void DequeueTaskMain(object state)
{
    CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

    int idleStateCount = 0;
    TimeSpan sleepInterval = DequeueInterval;

    try
    {
        // Run a dequeue task until asked to terminate or until a break condition is encountered.
        while (workerState.CanRun)
        {
            try
            {
                var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                int messageCount = 0;

                // Process the dequeued messages concurrently by taking advantage of the above PLINQ query.
                queueMessages.ForAll((message) =>
                {
                    // Reset the count of idle iterations.
                    idleStateCount = 0;

                    // Notify all subscribers that a new message requires processing.
                    workerState.OnNext(message);

                    // Once successful, remove the processed message from the queue.
                    workerState.QueueStorage.Delete<T>(message);

                    // Increment the number of processed messages.
                    messageCount++;
                });

                // Check whether or not we have done any work during this iteration.
                if (0 == messageCount)
                {
                    // Increment the number of iterations when we were not doing any work (e.g. no messages were dequeued).
                    idleStateCount++;

                    // Call the user-defined delegate informing that no more work is available.
                    if (QueueEmpty != null)
                    {
                        // Check if the user-defined delegate has requested a halt to any further work processing.
                        if (QueueEmpty(this, idleStateCount, out sleepInterval))
                        {
                            // Terminate the dequeue loop if user-defined delegate advised us to do so.
                            break;
                        }
                    }

                    // Enter the idle state for the defined interval.
                    Thread.Sleep(sleepInterval);
                }
            }
            catch (Exception ex)
            {
                if (ex is OperationCanceledException)
                {
                    throw;
                }
                else
                {
                    // Offload the responsibility for handling or reporting the error to the external object.
                    workerState.OnError(ex);

                    // Sleep for the specified interval to avoid a flood of errors.
                    Thread.Sleep(sleepInterval);
                }
            }
        }
    }
    finally
    {
        workerState.OnCompleted();
    }
}

Относительно реализации метода DequeueTaskMain стоит отметить несколько моментов.

Во-первых, мы воспользуемся преимуществами Parallel LINQ (PLINQ) при диспетчеризации сообщений для обработки. Главное преимущество PLINQ заключается в том, что он ускорит обработку сообщений, выполняя делегаты запросов в отдельных рабочих потоках на нескольких процессорах (если возможно, параллельно).

noteПримечание
Поскольку параллелизация запросов управляется PLINQ, то гарантии того, что PLINQ задействует для этого более одного ядра, не существует. PLINQ может выполнить запрос последовательно, если он определит, что из-за затрат на параллелизацию выполнение запроса замедлится. Чтобы получить выигрыш от использования PLINQ, общий объем работы в запросе должен быть достаточно большим, что позволит оправдать издержки на планирование обработки в пуле потоков.

Во-вторых, мы извлекаем сообщения не по одному за раз. Вместо этого API службы очередей извлекает определенное число сообщений из очереди. Число сообщений задается параметром DequeueBatchSize, который передается методу Get<T>. После перехода на уровень абстракции хранилища, реализованный в рамках общего решения, этот параметр передается методу API службы очередей. Кроме того, будет запущена проверка безопасности, чтобы убедиться в том, что пакет не превышает максимального размера, поддерживаемого API. Это реализовано следующим образом.

/// This class provides reliable generics-aware access to the Windows Azure Queue storage.
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    /// The maximum batch size supported by Queue Service API in a single Get operation.
    private const int MaxDequeueMessageCount = 32;

    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
        Guard.ArgumentNotZeroOrNegativeValue(count, "count");

        try
        {
            var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

            IEnumerable<CloudQueueMessage> queueMessages = this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() =>
            {
                return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout);
            });

            // ... There is more code after this point ...

И наконец, задача вывода из очереди не будет работать бесконечно. Мы провизионировали явную контрольную точку, реализованную как событие QueueEmpty, которое вызывается в том случае, когда очередь пустеет. В этот момент происходит обращение к обработчику событий QueueEmpty, чтобы определить, разрешено ли завершить задачу вывода из очереди. Хорошо разработанная реализация обработчика событий QueueEmpty обеспечивает поддержку возможности «автомасштабирования вниз», как описано в следующем разделе.

Обработчик событий QueueEmpty играет двойную роль. Во-первых, он отвечает за обратную связь с исходной задачей вывода из очереди, указывая ей перейти в режим сна на определенный интервал времени (задаваемый в выходном параметре delay делегата события). Во-вторых, он указывает, должна ли задача вывода из очереди завершить работу (как предписывает логический возвращаемый параметр).

Следующая реализация обработчика событий QueueEmpty решает две основные проблемы, описанные выше. Она вычисляет случайный экспоненциальный интервал отсрочки и указывает на то, что задачей вывода из очереди является экспоненциальное увеличение задержки между запросами опроса очереди. Обратите внимание, что в соответствии с настройками данного решения отсрочка не может превышать 1 секунды, поскольку, если автомасштабирование реализовано достаточно хорошо, необходимости в длинных задержках между опросами нет. Кроме того, она запрашивает состояние прослушивателя очереди, чтобы определить число активных задач вывода из очереди. Если это значение превышает 1, обработчик событий указывает на то, что исходная задача вывода из очереди — завершить свой цикл опроса при условии, что интервал отсрочки достиг указанного максимума. В противном случае задача вывода из очереди не будет завершена, останется ровно один поток опроса, работающий на каждом экземпляре прослушивателя очереди. Такой подход помогает сократить число транзакций в хранилище и таким образом снижает затраты на транзакции, как описывалось выше.

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
    ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

    // Find out which extension is responsible for retrieving the worker role configuration settings.
    IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>();

    // Get the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo queueServiceState = queueService.QueryState();

    // Set up the initial parameters, read configuration settings.
    int deltaBackoffMs = 100;
    int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds);
    int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds);

    // Calculate a new sleep interval value that will follow a random exponential back-off curve.
    int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
    int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);

    // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
    delay = TimeSpan.FromMilliseconds((double)interval);

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself
    // unless this is a last deqeueue task. If so, we are not going to keep it running and continue polling the queue.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1;
}

На более высоком уровне описанную выше возможность «масштабирования вниз задачи вывода из очереди» можно описать следующим образом.

  1. Когда в очереди что-нибудь есть, задачи вывода из очереди будут обеспечивать скорейшую обработку рабочей нагрузки. Между запросами на вывод сообщений из очереди не будет задержки.

  2. После того как очередь пустеет, каждая задача вывода из очереди вызывает событие QueueEmpty.

  3. Обработчик событий QueueEmpty подсчитывает случайную экспоненциальную отсрочку и указывает задаче вывода из очереди приостановить работу на заданный интервал.

  4. Задачи вывода из очереди продолжат опрос очереди по истечении этого интервала до тех пор, пока продолжительность бездействия не достигнет максимально допустимого значения.

  5. Если по истечении этого интервала очередь будет по-прежнему пуста, все активные задачи вывода из очереди начнут завершать свою работу — это происходит не одномоментно, потому что все задачи вывода из очереди имеют отсрочки, начатые в разные моменты времени.

  6. В некоторый момент времени останется только одна задача вывода из очереди, ожидающая работу. В результате выполнять опрашивающие транзакции к очереди вхолостую будет только одна задача.

Для полноты описания процесса сбора характеристик нагрузки на момент времени следовало бы упомянуть актуальные артефакты исходного кода. Во-первых, в нем есть структура, в которой хранятся соответствующие метрики, полученные по результатам применения нагрузки к решению. Для простоты было включено небольшое подмножество метрик, которое будет использовано далее в примере кода.

/// Implements a structure containing point-in-time load characteristics for a given queue listener.
public struct CloudQueueListenerInfo
{
    /// Returns the approximate number of items in the Windows Azure queue.
    public int CurrentQueueDepth { get; internal set; }

    /// Returns the number of dequeue tasks that are actively performing work or waiting for work.
    public int ActiveDequeueTasks { get; internal set; }

    /// Returns the maximum number of dequeue tasks that were active at a time.
    public int TotalDequeueTasks { get; internal set; }
}

Во-вторых, в нем есть метод, реализуемый прослушивателем очереди, который возвращает метрики нагрузки, как показано в следующем примере.

/// Returns the current state of the queue listener to determine point-in-time load characteristics.
public CloudQueueListenerInfo QueryState()
{
    return new CloudQueueListenerInfo()
    {
        CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName),
        ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(),
        TotalDequeueTasks = this.dequeueTasks.Count
    };
}

В предыдущем разделе мы рассматривали возможность сокращения числа активных задач вывода из очереди до одного экземпляра для минимизации влияния транзакций в режиме бездействия на затраты, связанные с хранилищем. В этом разделе мы разберем противоположный пример, в котором будет реализована возможность «автоматического масштабирования вверх» для привлечения вычислительной мощности, когда в этом есть необходимость.

Во-первых, определим делегат события, который поможет отслеживать изменения состояния очереди с пустого на непустой для запуска соответствующих действий.

/// <summary>
/// Defines a callback delegate which will be invoked whenever new work arrived to a queue while the queue listener was idle.
/// </summary>
/// <param name="sender">The source of the event.</param>
public delegate void WorkDetectedDelegate(object sender);

Затем мы расширим первоначальное определение интерфейса ICloudQueueServiceWorkerRoleExtension, включив в него новое событие, которое будет вызываться каждый раз, когда прослушиватель очереди обнаружит новые рабочие элементы, фактически при изменении размера очереди с нуля на положительное значение.

public interface ICloudQueueServiceWorkerRoleExtension
{
    // ... The other interface members were omitted for brevity. See the previous code snippets for reference ...

    // Defines a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    event WorkDetectedDelegate QueueWorkDetected;
}

Также мы определим место в коде прослушивателя очереди, где это событие будет вызываться. Будем вызывать событие QueueWorkDetected из цикла вывода из очереди, реализованного в методе DequeueTaskMain, который нужно расширить следующим образом.

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // An instance of the delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    public event WorkDetectedDelegate QueueWorkDetected;

    private void DequeueTaskMain(object state)
    {
        CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

        int idleStateCount = 0;
        TimeSpan sleepInterval = DequeueInterval;

        try
        {
            // Run a dequeue task until asked to terminate or until a break condition is encountered.
            while (workerState.CanRun)
            {
                try
                {
                    var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                    int messageCount = 0;

                    // Check whether or not work items arrived to a queue while the listener was idle.
                    if (idleStateCount > 0 && queueMessages.Count() > 0)
                    {
                        if (QueueWorkDetected != null)
                        {
                            QueueWorkDetected(this);
                        }
                    }

                    // ... The rest of the code was omitted for brevity. See the previous code snippets for reference ...

На последнем шаге создадим обработчик для события QueueWorkDetected. Реализация этого обработчика событий будет передаваться компонентом, который создает экземпляр прослушивателя очереди и размещает его у себя. В нашем случае это рабочая роль. Код, отвечающий за создание экземпляра и реализацию обработчика событий, состоит из следующих элементов.

public class WorkItemProcessorWorkerRole : RoleEntryPoint
{
    // Called by Windows Azure to initialize the role instance.
    public override sealed bool OnStart()
    {
        // ... There is some code before this point ...

        // Instantiate a queue listener for the input queue.
        var inputQueueListener = new CloudQueueListenerExtension<XDocument>(inputQueueLocation);

        // Configure the input queue listener.
        inputQueueListener.QueueEmpty += HandleQueueEmptyEvent;
        inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent;
        inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize;
        inputQueueListener.DequeueInterval = configSettingsExtension.Settings.MinimumIdleInterval;

        // ... There is more code after this point ...
    }

    // Implements a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    private void HandleQueueWorkDetectedEvent(object sender)
    {
        // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
        ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

        // Get the current state of the queue listener to determine point-in-time load characteristics.
        CloudQueueListenerInfo queueServiceState = queueService.QueryState();

        // Determine the number of queue tasks that would be required to handle the workload in a queue given its current depth.
        int dequeueTaskCount = GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth);

        // If the dequeue task count is less than computed above, start as many dequeue tasks as needed.
        if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount)
        {
            // Start the required number of dequeue tasks.
            queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks);
        }
    }       // ... There is more code after this point ...

В свете вышеприведенного примера метод GetOptimalDequeueTaskCount требует более глубокого рассмотрения. Этот метод отвечает за вычисление количества задач вывода из очереди, которое можно будет считать оптимальным для обработки рабочей нагрузки, находящейся в очереди. При вызове этого метода он должен определить (с помощью любого подходящего механизма принятия решений), сколько «лошадиных сил» потребуется прослушивателю очереди для обработки объема работы, находящегося в очереди или ожидаемого к поступлению в эту очередь.

Например, разработчик может применить упрощенный подход и встроить набор статических правил непосредственно в метод GetOptimalDequeueTaskCount. Используя известную пропускную способность и характеристики масштабируемости инфраструктуры обслуживания очередей, среднюю задержку обработки, размер полезных данных и другие входные данные, набор правил может делать оптимистичные предположения и выбирать оптимальное число задач вывода из очереди.

В вышеприведенном примере используется намеренно упрощенная методика определения числа задач вывода из очереди.

/// <summary>
/// Returns the number of queue tasks that would be required to handle the workload in a queue given its current depth.
/// </summary>
/// <param name="currentDepth">The approximate number of items in the queue.</param>
/// <returns>The optimal number of dequeue tasks.</returns>
private int GetOptimalDequeueTaskCount(int currentDepth)
{
    if (currentDepth < 100) return 10;
    if (currentDepth >= 100 && currentDepth < 1000) return 50;
    if (currentDepth >= 1000) return 100;

    // Return the minimum acceptable count.
    return 1;
}

Следует повторить, что приведенный выше пример ни в коем случае не должен рассматриваться в качестве панацеи «на все случаи жизни». Более удачным решением стал бы вызов внешне настраиваемого и управляемого правила, которое выполняет все необходимые вычисления.

На этом этапе у нас имеется работающий прототип прослушивателя очереди, способного автоматически масштабироваться вверх и вниз по мере изменения рабочей нагрузки. В качестве конечного штриха, возможно, потребуется добавить возможность адаптации к переменной нагрузке во время ее обработки. Эту возможность можно добавить точно таким же образом, как и при добавлении поддержки для события QueueWorkDetected.

Теперь перейдем к другому важному пункту оптимизации, который поможет сократить задержки в работе прослушивателей очередей.

В этом разделе мы расширим вышеприведенную реализацию прослушивателя очереди, дополнив ее механизмом отправки уведомлений на основе возможностей односторонней многоадресной передачи Service Bus. Механизм уведомлений отвечает за вызов событий, сообщающих прослушивателю очереди, что нужно заняться выводом данных из очереди. Такой подход помогает избежать опроса очереди в ожидании новых сообщений и таким образом устраняет связанные с этим задержки.

Во-первых, определим событие-триггер, которое будет получать наш прослушиватель очереди в том случае, когда в очередь поступила новая рабочая нагрузка.

/// Implements a trigger event indicating that a new workload was put in a queue.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class CloudQueueWorkDetectedTriggerEvent
{
    /// Returns the name of the storage account on which the queue is located.
    [DataMember]
    public string StorageAccount { get; private set; }

    /// Returns a name of the queue where the payload was put.
    [DataMember]
    public string QueueName { get; private set; }

    /// Returns a size of the queue's payload (e.g. the size of a message or the number of messages in a batch).
    [DataMember]
    public long PayloadSize { get; private set; }

    // ... The constructor was omitted for brevity ...
}

Затем мы разрешим реализациям прослушивателя очереди действовать в качестве подписчиков для получения события-триггера. Сначала необходимо определить прослушиватель очереди в качестве наблюдателя для события CloudQueueWorkDetectedTriggerEvent.

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent>
{
    // ... The body is omitted as it was supplied in previous examples ...
}

Второй шаг — реализация метода OnNext, определенного в интерфейсе IObserver<T>. Этот метод вызывается поставщиком для уведомления наблюдателя о новом событии.

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // ... There is some code before this point ...

    /// <summary>
    /// Gets called by the provider to notify this queue listener about a new trigger event.
    /// </summary>
    /// <param name="e">The trigger event indicating that a new payload was put in a queue.</param>
    public void OnNext(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Make sure the trigger event is for the queue managed by this listener, otherwise ignore.
        if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName)
        {
            if (QueueWorkDetected != null)
            {
                 QueueWorkDetected(this);
            }
        }
    }

    // ... There is more code after this point ...
}

Как видно в вышеприведенном примере, мы намеренно вызываем тот же делегат события, поскольку он используется в предыдущих шагах. Обработчик событий QueueWorkDetected уже содержит всю необходимую логику приложения для создания оптимального числа экземпляров задач вывода из очереди. Поэтому тот же обработчик событий будет повторно использован при обработке уведомления CloudQueueWorkDetectedTriggerEvent.

Как уже отмечалось в предыдущих разделах, при реализации подхода с передачей уведомлений не надо поддерживать постоянную работу задачи вывода из очереди. Поэтому можно сократить число задач на экземпляр прослушивателя очереди до нуля и использовать механизм уведомлений для создания экземпляров задач вывода из очереди, когда в очередь поступают рабочие элементы. Чтобы убедиться в том, что неактивные задачи вывода из очереди не запущены, нужно внести следующее простое изменение в обработчик событий QueueEmpty.

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // ... There is some code before this point ...

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs;
}

В результате система больше не определяет, осталась ли хоть одна активная задача вывода из очереди. Измененный обработчик события QueueEmpty учитывает только факт превышения максимального интервала бездействия, по истечении которого все активные задачи вывода из очереди будут завершены.

Чтобы получать уведомления CloudQueueWorkDetectedTriggerEvent, мы задействуем модель публикации-подписки, реализованную как слабосвязанный обмен сообщениями между экземплярами ролей Windows Azure. В сущности, обработка входящих сообщений производится на том же слое межролевого обмена данными следующим образом.

public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension
{
    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    public void OnNext(InterRoleCommunicationEvent e)
    {
        if (this.owner != null && e.Payload != null)
        {
            // ... There is some code before this point ...

            if (e.Payload is CloudQueueWorkDetectedTriggerEvent)
            {
                HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent);
                return;
            }

            // ... There is more code after this point ...
        }
    }

    private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Enumerate through registered queue listeners and relay the trigger event to them.
        foreach (var queueService in this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>())
        {
            // Pass the trigger event to a given queue listener.
            queueService.OnNext(e);
        }
    }
}

За многоадресную рассылку события триггера, определенного в классе CloudQueueWorkDetectedTriggerEvent, отвечает издатель, а именно компонент, помещающий в очередь рабочие элементы. Это событие может быть вызвано как до помещения самого первого элемента в очередь, так и после помещения в нее последнего элемента. В нижеприведенном примере публикация события-триггера производится после завершения помещения рабочих элементов во входную очередь.

public class ProcessInitiatorWorkerRole : RoleEntryPoint
{
    // The instance of the role extension which provides an interface to the inter-role communication service.
    private volatile IInterRoleCommunicationExtension interRoleCommunicator;

    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    private void HandleWorkload()
    {
        // Step 1: Receive compute-intensive workload.
        // ... (code was omitted for brevity) ...

        // Step 2: Enqueue work items into the input queue.
        // ... (code was omitted for brevity) ...

        // Step 3: Notify the respective queue listeners that they should expect work to arrive.
        // Create a trigger event referencing the queue into which we have just put work items.
        var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue");

        // Package the trigger into an inter-role communication event.
        var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger);

        // Publish inter-role communication event via the Service Bus one-way multicast.
        interRoleCommunicator.Publish(interRoleEvent);
    }
}

Создав прослушиватель очереди, способный поддерживать многопоточность, автомасштабирование и принудительно отправляемые уведомления, пора обобщить все рекомендации, относящиеся к разработке решений обмена сообщениями на основе очередей на платформе Windows Azure.

Чтобы обеспечить максимальное повышение эффективности и добиться снижения затрат в решениях по обмену сообщениями на основе очередей, которые работают на платформе Windows Azure, архитекторы решений и разработчики должны учитывать следующие рекомендации.

Архитектор решений должен выполнить следующее.

  • Спровизионировать архитектуру обмена сообщениями на основе очередей, в которой используется служба хранения очередей Windows Azure для широкомасштабного асинхронного обмена данными между слоями и службами в облачных и гибридных решениях.

  • Рекомендовать для секционированной архитектуры очередей выполнить масштабирование свыше 500 транзакций/сек.

  • Понимать основы модели ценообразования Windows Azure и оптимизировать решение для сокращения стоимости транзакции через применение ряда рекомендаций и шаблонов проектирования.

  • Учитывать требования динамического масштабирования, провизионировав архитектуру, которая будет адаптироваться к изменяющейся рабочей нагрузке.

  • Применять правильные методики автоматического масштабирования и подходы для эластичного расширения и сокращения вычислительных мощностей, чтобы еще более оптимизировать эксплуатационные расходы.

  • Оценить соотношение «затраты-выгода» в сокращении задержек за счет создания зависимости от Windows Azure Service Bus для диспетчеризации уведомлений с принудительной отправкой в реальном времени.

Разработчик должен выполнить следующее.

  • Разработать решение по обмену сообщениями, которое будет выполнять пакетную обработку при отправке и получении данных из очередей Windows Azure.

  • Реализовать эффективную службу прослушивателя очереди, которая будет обеспечивать опрос очередей не более чем одним потоком вывода из очереди, если очереди пусты.

  • Динамически масштабировать вниз число экземпляров рабочей роли, когда очередь пуста в течение продолжительного времени.

  • Реализовать уникальный для приложения алгоритм случайной экспоненциальной отсрочки, чтобы сократить затраты на транзакции в хранилище, вызываемые опросом очередей во время простоя.

  • Применять правильные методики, которые позволят предотвратить превышение целей масштабирования для одной очереди при реализации многопоточных и многоэкземплярных издателей и потребителей очередей.

  • Реализовать надежную политику повторов, способную обрабатывать разнообразные временные ошибки при публикации и получении данных из очередей Windows Azure.

  • Использовать возможность односторонней обработки событий, реализуемую Windows Azure Service Bus для поддержки принудительно отправляемых уведомлений, чтобы сократить задержки и увеличить производительность решения обмена сообщениями на основе очередей.

  • Изучить такие новые возможности .NET Framework 4, как TPL, PLINQ и шаблон Observer, чтобы добиться максимальной степени параллелизма, улучшить параметры параллелизма и упросить разработку многопоточных служб.

Сопутствующий образец кода доступен для загрузки из коллекции исходных кодов MSDN. Образец кода включает также все необходимые компоненты инфраструктуры, такие как слой абстракции с учетом универсальных типов для службы очередей Windows Azure, которые отсутствуют в вышеприведенных фрагментах кода. Обратите внимание, что открытая лицензия Microsoft распространяется не на все файлы исходного кода, как описано в соответствующей юридической информации.

Дополнительные сведения о теме, обсуждаемой в данном техническом документе, см. в следующих разделах:

Показ:
© 2014 Microsoft