MSDN Library

Рекомендации по обработке больших сообщений с помощью очередей Azure

Обновлено: Март 2015 г.

Автор: Валерий Мизонов

Рецензенты: Брэд Колдер (Brad Calder), Ларри Фрэнкс (Larry Franks), Джей Харидас (Jai Haridas), Сидни Хига (Sidney Higa), Кристиан Мартинес (Christian Martinez), Курт Петерсон (Curt Peterson) и Марк Симмс (Mark Simms)

В этой статье приводится ориентированное на разработчиков руководство по реализации учитывающего универсальные типы уровня абстракции хранилища для службы очереди Azure. Для поддержки очень больших сообщений в очередях Azure необходимо преодолеть существующие ограничения на размер сообщений. В этой статье и соответствующем коде содержатся полезные сведения об использовании службы очередей Azure без необходимости учета размеров сообщений, обусловленной ограничением очереди в 64 КБ.

Зачем нужны большие сообщения?

Помните, было время, когда мы считали, что "640 КБ с головой хватит для любых целей. ". В прошлом всего несколько килобайт могли быть для дисциплинированного разработчика огромным пространством, в котором можно было уместить все данные приложения. В наше время объемы данных, которыми должны обмениваться приложения, могут сильно варьироваться. Неважно, работают ли они с маленькими HL7-сообщениями или EDI-документами объемом в несколько мегабайт, современные приложения должны уметь работать с данными, объемные характеристики которых могут меняться с непрогнозируемой скоростью. Бизнес-объект, который в прошлом веке мог быть выражен в многобайтной структуре, сегодня легко может разрастись в требующий огромного места артефакт, в несколько раз превышающий в объеме своих предшественников за счет современных методик и форматов сериализации и представления.

Обработка сообщений в данной архитектуре решения без наложения технических ограничений на размер сообщений является ключевым условием для поддержки постоянно изменяющихся объемов данных. Использования больших сообщений не всегда можно избежать. Например, если решение, обеспечивающее взаимодействие между компаниями, предназначено для обработки трафика в формате EDI, оно должно быть готово к получению EDI-документов размером до нескольких мегабайт. Каждый уровень, служба и компонент сквозного потока должны быть способны к работе с соответствующим размером документа. Во время тестирования может быть сделано неприятное открытие, когда после успешного принятия 20-мегабайтного EDI-документа «846 Inventory Advice» посредством веб-службы возникнет ошибка при его сохранении в очередь на обработку в связи с ограничениями, налагаемыми на размер сообщений.

Зачем вообще использовать очередь для больших сообщений на платформе Azure? Чем плохи альтернативные варианты, например большие двоичные объекты, таблицы, облачные диски или среда База данных SQL Microsoft Azure? Напомню, что очереди позволяют реализовывать некоторые типы сценариев обмена сообщениями, характеризующиеся асинхронным, слабосвязанным обменом данными между производителями и потребителями, выполняемым с возможностями масштабирования и обеспечения надежности. Использование очередей Azure отделяет различные части данного решения и обеспечивает уникальную семантику, например принцип доставки «первым пришел, первым ушел» (FIFO) и доставки «хотя бы один раз». Подобную семантику может быть достаточно трудно реализовать с помощью других альтернативных механизмов обмена данными. Кроме того, очереди лучше подходят к использованию в качестве временного хранилища для обмена данными между службами, уровнями и приложениями, а не в качестве постоянного хранилища данных. Соответствующее требование к обмену данными может быть выражено в различных формах, например это может быть передача сообщений между компонентами в асинхронном режиме, выравнивание нагрузки или горизонтальное масштабирование для выполнения сложных вычислительных рабочих нагрузок. Многие из этих моделей обмена данными нельзя легко реализовать без использования очередей. В общем, очереди являются важнейшей функциональной возможностью. Исключение вопросов о том, что может, а что не может быть включено в очередь, является веским аргументом для создания унифицированных решений по обмену сообщениями на основе очередей, которые могут обрабатывать данные любых размеров.

Этот уровень абстракции облегчит публикацию и потребление экземпляров сущностей для конкретных приложений и позволит избежать работы с байтовыми массивами или строками, которые в данный момент являются единственными типами, поддерживаемыми API-интерфейсом службы очередей. Я буду широко использовать универсальные шаблоны .NET, реализую некоторые возможности, добавляющие удобство, например прозрачное сжатие и распаковку потока, а также применю некоторые рекомендуемые методы работы, например обработку периодических ошибок, для улучшения устойчивости к ошибкам операций хранилища.

Вопросы проектирования

В данное время сообщения, которые после сериализации и кодирования имеют объем более 64 КБ, не могут храниться в очереди Azure. При попытке разместить в очереди сообщение, размер которого больше 64 КБ, клиентский API-интерфейс вернет исключение. На момент написания данного предложения максимальный размер сообщения, возвращаемый этим свойством, составлял 65536.

ImportantВажно!
При передаче сообщений в очередь они кодируются с использованием кодировки Base64. Закодированные полезные данные всегда больше по размеру, чем составляющие их необработанные данные. В среднем кодировка Base64 добавляет 25 % дополнительного объема. В связи с этим ограничение размера в 64 КБ на самом деле не позволяет хранить сообщения, размер полезных данных которых превышает 48 КБ (75 % от 64 КБ).

Хотя это ограничение относится к одиночным элементам очереди, оно может мешать использованию определенных типов сообщений, особенно тех, которые нельзя разбить на более мелкие фрагменты данных. Постоянное беспокойство о том, может ли данное сообщение быть размещено в очереди, вряд ли повысит производительность разработчика. В конечном счете главная задача — сделать так, чтобы данные моего приложения перемещались между производителями и потребителями максимально эффективно вне зависимости от их размера. Когда одна сторона вызывает команду Put (или Enqueue), а другая — команду Get (или Dequeue) к очереди, остальные действия в теории должны быть выполнены автоматически.

Преодоление ограничения на размер для сообщений в очередях Azure с помощью интеллектуальных методов работы с большими сообщениями — это главная предпосылка для решения технической задачи, рассматриваемой в этой статье. Ее реализация потребует некоторой дополнительной работы. В современном мире коммерческой разработки программных продуктов любые дополнительные работы должны быть рационально обоснованы. Дополнительные вложения я обосную следующими целями проектирования:

  • Поддержка очень больших сообщений за счет снятия ограничений, налагаемых API-интерфейсом службы очередей на размер сообщений.

  • Поддержка определяемых пользователями универсальных объектов при публикации и получении сообщений из очереди Azure.

  • Прозрачное переполнение настраиваемого хранилища сообщений — контейнер больших двоичных объектов, распределенный кэш или другой тип хранилища, способный хранить большие сообщения.

  • Прозрачное сжатие, предназначенное для повышения эффективности затрат путем сведения к минимуму объема пространства, занимаемого большими сообщениями.

  • Повышенная надежность, реализуемая за счет широкого использования рекомендаций по обработке временных состояний при выполнении операции с очередями.

Основа для поддержки больших сообщений в очередях с ограничением по размеру будет иметь следующий вид. Сначала определяю, может ли данное сообщение быть размещено в очереди Azure без выполнения какой-либо дополнительной работы. Метод определения того, может ли сообщение быть спокойно размещено в очереди без нарушения ограничения по размеру, будет реализован посредством формулы, которая будет заключена во вспомогательную функцию следующим образом.

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

Если размер сообщения меньше ограничения, я просто вызываю API-интерфейс службы очередей, чтобы поставить сообщение в очередь без изменений. Если размер сообщения выходит за установленные границы, поток данных становится интереснее. На следующей блок-схеме показываются последующие действия.

Сохранение-потока-сообщений1

Говоря вкратце, если сообщение не может быть помещено в очередь из-за его размера, оно перенаправляется в хранилище сообщений, способное хранить большие сообщения. Затем создается крошечное сообщение с метаданными, содержащее ссылку на элемент, находящийся в хранилище переполнения. И наконец, сообщение с метаданными помещается в очередь. Перед проверкой сообщения на пригодность для хранения в очереди я всегда выполняю его сжатие. Это увеличивает совокупность сообщений, которые могут быть поставлены в очередь без необходимости помещения в хранилище переполнения. Хорошим примером будет XML-документ размером немного больше 64 КБ, который после сериализации и сжатия становится идеальным кандидатом для включения в очередь. Это поведение можно изменить в случае, если использовать сжатие по умолчанию нежелательно. Достичь этого можно путем предоставления пользовательского компонента сериализации, описанного в следующем разделе.

Здесь необходимо рассмотреть ряд вопросов, главным образом с точки зрения затрат. Как можно отметить на блок-схеме выше, я пытаюсь определить, сможет ли большое сообщение сначала переполнить кэш Azure. Поскольку за использование распределенной облачной службы кэша взимается плата, размещение сообщений в кэше не должно быть обязательным. Это отражено на блок-схеме.

Кроме того, возможны случаи, когда сообщение достаточно большое, что делает его непригодным для хранения в распределенном кэше с ограничением по размерам. На момент написания данной статьи максимальный размер кэша был 4 ГБ. Это необходимо учесть и предоставить путь отработки отказа на случай превышения объема или квот кэша. Квоты имеют поведение вытеснения, которое также следует учесть.

ImportantВажно!
Использование кэша Azure в качестве хранилища переполнения помогает уменьшить задержку и исключает избыточные транзакции хранилища при обмене большим числом сообщений. Она обеспечивает инфраструктуру распределенного кэширования с высоким уровнем доступности, способную выполнять репликацию и обслуживание кэшированных данных в памяти, размещенных на нескольких серверах кэширования для обеспечения надежности. Эти преимущества могут быть сведены на нет ограничением размера кэша и затратами, связанными с использованием службы. Поэтому важно проанализировать затраты для оценки преимуществ и недостатков использования кэша Azure в качестве хранилища переполнения в некоторых сценариях.

noteПримечание
Рекомендации по выбору подходящего предложения кэша Azure для приложения см. в разделе Какой кэш Azure подходит мне лучше всего?.

Вот вспомогательная функция, которая будет определять, можно ли считать указанное значение размера элемента оптимальным при сохранении элемента заданного размера в кэше:

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

Теперь, рассмотрев некоторые начальные предпосылки, мы можем перейти на сторону потребителя и рассмотреть модель реализации получения больших сообщений из очереди. Для начала, чтобы облегчить понимание, представим технологическую схему визуально.

Переполнение-размера-сообщения

Резюмируем работу приведенной выше схемы. Из очереди извлекается сообщение неизвестного типа и сравнивается с типом сообщений с метаданными. Если это не сообщение с метаданными, поток продолжает работу, выполняя логику распаковки, чтобы правильно восстановить исходное сообщение перед его доставкой получателю. Если же это было сообщение с метаданными, то оно проверяется на предмет определения типа хранилища переполнения, используемого для хранения фактического сообщения. Если определяется, что сообщение хранится в кэше, вызывается соответствующий API-интерфейс службы Caching Service и возвращается реальное сообщение, после чего выполняется его распаковка и передача получателю. В случае если само сообщение было помещено в контейнер больших двоичных объектов, для его получения из сущности большого двоичного объекта, распаковки и передачи обратно вызывающему будет вызван API-интерфейс службы больших двоичных объектов.

В дополнение к обработке операций Enqueue и Dequeue для обработки больших сообщений необходимо убедиться, что все перенаправленные полезные данные удаляются из соответствующих хранилищ переполнения для сообщений по запросу пользователя. Одним из возможных методов реализации этого является привязка процесса удаления к операции Delete при ее вызове для данного сообщения. Визуально эту операцию можно представить следующим образом.

Переполнение-размера-сообщения

Перед тем как мы начнем реализовывать рассмотренные выше модели, стоит коснуться еще одного вопроса, а именно определения сообщения. Что может считаться сообщением и какие формы оно может принимать? Может ли оно быть байтовым массивом, потоком данных, простым типом, таким как строка, или сложным объектом определенного приложения, который разработчик реализует в составе объектной модели решения? Я считаю, что в этой области нам не стоит себя ограничивать. Предположим, что сообщение имеет универсальный тип <T>, что означает, что оно может являться чем угодно по желанию разработчика. Вы увидите, что конечная реализация естественным образом развернется в соответствии с этой концепцией.

С учетом вышесказанного на следующей диаграмме показаны все три возможных пути, предусмотренные в проекте.

Пути-прохождения-сообщений

На этом этапе, кажется, существует достаточно входных данных, чтобы начать реализацию технического проекта. С этого момента я сосредоточусь на описании исходного кода, необходимого для реализации описанных выше моделей.

Техническая реализация

Для самостоятельной работы загрузите полный образец кода из коллекции исходных кодов MSDN. Образец поставляется в составе более крупной комплексной справочной реализации, в основе которой лежат обсуждаемые в этой статье модели. После загрузки и распаковки перейдите к проекту Azure.Services.Framework в узле Contoso.Cloud.Integration и разверните папку Storage. В этой папке содержатся все основные артефакты кода, описанные ниже.

Как уже упоминалось вначале, первоначальная идея заключалась в том, чтобы абстрагировать взаимодействие облачного приложения с очередями Windows Azure. Я подхожу к решению этой задачи, предоставляя контракт, который определяет основные операции, поддерживаемые моим пользовательским уровнем абстракции хранилища. Программный интерфейс, с помощью которого потребителям предоставляется доступ к контракту, показан ниже. Я намеренно исключил из приведенного ниже фрагмента кода некоторые функции уровня инфраструктуры, такие как создание и удаление очередей, поскольку на данный момент они неважны.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

Также необходим еще один контракт (интерфейс), который абстрагирует доступ к хранилищу переполнения больших сообщений. Контракт реализуется двумя компонентами, по одному на каждое хранилище переполнения (хранилище больших двоичных объектов и распределенный кэш). Контракт состоит из следующих операций:

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

Оба контракта широко используют универсальный тип <T>. Он позволяет настроить тип сообщения под любой тип .NET по вашему выбору. Однако мне придется обрабатывать некоторые особые случаи, а именно типы, требующие специального обращения, например потоки. Более подробно об этом я расскажу позже.

Независимо от выбранного типа сообщений существует одно важное требование: тип объекта, представляющего сообщение в очереди, должен быть сериализуемым. Все объекты, проходящие через уровень абстракции хранилища, прежде чем поступить в очередь или хранилище переполнения, подвергаются сериализации. В моей реализации сериализация и десериализация также связаны со сжатием и распаковкой соответственно. Такой подход повышает эффективность с точки зрения затрат и пропускной способности. Преимущество в отношении затрат появляется за счет того, что сжатые большие сообщения занимают меньше места в хранилище, в результате чего снижаются затраты на хранение. Эффективность использования пропускной способности появляется за счет уменьшения размера полезных данных благодаря сжатию, которое сокращает размеры полезных данных при их проходе по сети в хранилище Azure или из него.

Требование для сериализации и десериализации объявляется в специализированном интерфейсе. Любой компонент, реализующий этот интерфейс, должен обеспечить определенные функции сжатия, сериализации, десериализации и распаковки. Покажем пример этого интерфейса.

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

Для сжатия и распаковки я использую компонент DeflateStream платформы .NET Framework. Этот класс представляет стандартный в отрасли и соответствующий RFC алгоритм Deflate, предназначенный для сжатия и распаковки файлов без потерь. По сравнению с классом GZipStream он обеспечивает более оптимальное сжатие изображений и в целом лучшую производительность. В отличие от него класс GZipStream использует формат данных GZIP, который включает значение циклической проверки избыточности (CRC) для обнаружения повреждений данных. Внутри же формат данных GZIP использует тот же алгоритм сжатия, что и класс DeflateStream. В итоге GZipStream = DeflateStream + стоимость вычисления и хранения контрольных сумм CRC.

Моя реализация контракта приведена ниже. Обратите внимание, что алгоритмы сжатия можно легко переключать, заменяя класс DeflateStream на класс GZipStream и наоборот.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

Одной из наиболее мощных возможностей в реализации CloudStorageEntitySerializer является возможность применения специальной обработки для XML-документов обоих видов: XmlDocument и XDocument. Другой областью, на которую следует обратить внимание, является оптимальная сериализация и десериализация XML-данных. Здесь я решил воспользоваться классами XmlDictionaryReader и XmlDictionaryWriter, которые, как известно разработчикам .NET, прекрасно показывают себя в ситуациях, когда нужно эффективно выполнять сериализацию и десериализацию полезных данных XML с использованием двоичного формата XML .NET.

Решение относительно используемого типа хранилища переполнения сообщений входит в ответственность потребителя, который обращается к пользовательскому уровню абстракции хранилища. Руководствуясь этим принципом, я предоставлю возможность выбора необходимого типа хранилища сообщений, добавив следующие конструкторы в тип, реализующий интерфейс ICloudQueueStorage.

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

Указанные выше конструкторы не выполняют никаких сложных действий. Они просто инициализируют внутренние элементы и настраивают клиентский компонент, который будет осуществлять доступ к очереди Azure. Однако следует отметить, что я явно указываю клиенту очереди не реализовывать принудительно никакую политику повтора. Для формирования надежного уровня абстракции хранилища мне нужен более точный контроль над временными проблемами при выполнении операций с очередями Azure. Поэтому будет присутствовать отдельный компонент, который способен распознавать и обрабатывать намного больше различных ошибок, появляющихся нерегулярно.

Теперь рассмотрим изнутри класс ReliableCloudQueueStorage, который я очертил выше. В частности, рассмотрим его реализацию операции Put, поскольку именно в ней выполняется прозрачное перенаправление в хранилище переполнения больших сообщений.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

Новый артефакт кода, появившийся во фрагменте кода выше, представляет собой класс LargeQueueMessageInfo. Этот пользовательский тип в конечном счете является нашим сообщением с метаданными, описывающим расположение большого сообщения. Этот класс помечается как внутренний, поскольку он не должен быть виден никому вне реализации уровня абстракции хранилища. Класс определяется следующим образом.

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

Двигаемся дальше. Теперь мне нужно реализовать хранилище переполнения для больших сообщений, которое будет использовать службу хранилища больших двоичных объектов Azure. Как я указал ранее, этот компонент должен поддерживать интерфейс ICloudBlobStorage, используемый компонентом ReliableCloudQueueStorage для передачи сообщений в реализацию ICloudBlobStorage, когда их нельзя поместить в очередь из-за ограничения по размеру сообщений. Чтобы подготовить почву для следующих действий, я включу только реализацию конструктора.

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

Ранее в этой статье я показал реализацию операции Put, которая гарантирует, что небольшие сообщения всегда будут помещены в очередь, а большие сообщения будут прозрачно перенаправлены в хранилище переполнения. Ради последовательности рассмотрим теперь механику эквивалента операции Put, реализованного в хранилище переполнения.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

Итак, приведенный выше код принимает большой двоичный объект типа <T> и сначала проверяет, является ли он уже сериализованным образом сообщения в форме объекта Stream. Все большие сообщения, которые передаются в хранилище переполнения компонентом ReliableCloudQueueStorage, будут приходить в виде потоков, готовых к сохранению. В случае если не удается найти целевой контейнер большого двоичного объекта, код попытается создать отсутствующий контейнер. Он выполняет это действие в пределах области, учитывающей повторы, для улучшения надежности и повышения устойчивости к нерегулярным сбоям. Второй путь отработки отказа предназначен для обработки ситуации, когда большой двоичный объект с таким именем уже существует. Код удалит существующий большой двоичный объект при условии, что перезапись разрешена. После удаления будет выполнена повторная попытка передачи нового большого двоичного объекта. Опять же для повышения надежности эта операция выполняется в пределах области, учитывающей повторы.

Теперь, когда я могу сохранять крупные сообщения в контейнере больших двоичных объектов, следует разработать другую реализацию интерфейса ICloudBlobStorage, которая будет использовать кэш Azure. Будем последовательны и начнем с ее конструкторов.

/// <summary>
/// Implements reliable generics-aware layer for Azure Cache.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

Если вы вспомните рассмотренные ранее вопросы, то одним из ключевых технических проектных решений было использование для хранения больших сообщений как службы BLOB-объектов, так и кэша Azure. Вариант с кэшем главным образом подходит для временных объектов, размер полезных данных которых не превышает 8 МБ. Вариант с большими двоичными объектами предназначен для всего остального. Обычно для реализации такого решения возникает потребность в гибридном хранилище переполнения. Основа для создания гибридного хранилища уже находится в базе кода. Достаточно просто соединить существующие артефакты следующим образом.

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Cache in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that provides access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

Теперь я завершу рассказ, включив еще один фрагмент кода, показывающий реализацию операции Put в гибридном хранилище переполнения.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

Эту статью нельзя было бы считать полной, если бы я не привел несколько примеров того, как описанный выше уровень абстракции хранилища может использоваться из клиентского приложения. Эти примеры я совмещу с тестовым приложением, которое также проверит техническую реализацию.

Проверка

Для проверки того, что большие сообщения смогут успешно проходить через только что реализованный уровень абстракции хранилища, я создал очень простое консольное приложение. На первом шаге оно принимает образец XML-документа размером в 90 МБ и помещает его в очередь Azure. На втором шаге оно получает сообщение из очереди. Это сообщение должно быть исходным XML-документом, который будет записан обратно на диск под другим именем, чтобы иметь возможность сравнить размер файла и его содержимое. Между этими шагами приложение входит в режим паузы, во время которой можно просмотреть содержимое очереди и соответствующего хранилища переполнения сообщений (кэша или контейнера большого двоичного объекта). Исходный код тестового приложения приведен ниже.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

Для полноты понимания ниже приведен файл конфигурации приложения, который использовался при проверке. Если вы планируете попробовать тестовое приложение в работе, измените свою копию файла app.config и добавьте реальные учетные данные учетной записи хранилища и сведения о конечной точке службы Caching Service.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

Если тестовое приложение было успешно скомпилировано и выполнено, в окнах консоли должны отобразиться выходные данные, аналогичные следующим.

Вывод-в-окно-консоли

Если вы посмотрите в учетную запись хранилища, используемую тестовым приложением, в очереди появится следующее сообщение.

Сообщение-сериализации-метаданных

Поскольку тестовое сообщение было достаточно крупным для перенаправления непосредственно в хранилище больших двоичных объектов, на следующем снимке экрана отображается ожидаемое содержимое внутри соответствующего контейнера большого двоичного объекта на момент, когда работа тестового приложения будет приостановлена.

Контейнер больших двоичных объектов

Обратите внимание, что исходный XML-документ размером 90 МБ, использованный в тесте, стал большим двоичным объектом размером 11 МБ. Это отражает 87 % экономии затрат на хранение и пропускную способность, что является результатом применения двоичной сериализации XML. Исходя из целевого класса сценариев, двоичная сериализация XML + сжатие является первым и лучшим вариантом.

После того как тестовое приложение выполнит удаление сообщения из очереди, ожидается, что сообщение с метаданными будет удалено параллельно с большим двоичным объектом, содержащим данные сообщения, как показано на снимке экрана ниже.

Удаленный большой двоичный объект

В показанном выше примере отражен упрощенный вид жизненного цикла большого сообщения. Он предназначен для выделения основных элементов уровня абстракции хранилища, например маршрутизации больших сообщений в хранилище больших двоичных объектов, прозрачного сжатия сообщений и автоматического удаления обоих частей сообщения. Теперь, наверное, самое время перейти к заключению.

Заключение

Как мы увидели, с помощью кэша Azure и службы BLOB-объектов Azure использование очередей Azure можно расширить для поддержки сообщений, размер которых превышает 64 КБ, при этом на клиент не накладывается каких-либо дополнительных технических ограничений. В этой статье я показал, что, потратив немного усилий, можно повысить степень удобства обмена сообщениями для клиента, обеспечив следующие усовершенствования:

  • Прозрачное сжатие сообщений для уменьшения расходов на хранение и экономии пропускной способности при обмене данными с центром обработки данных.

  • Прозрачная, легко настраиваемая обработка переполнения, позволяющая перенаправлять большие сообщения в кэш или хранилище больших двоичных объектов.

  • Поддержка универсальных шаблонов, позволяющая легко хранить объекты любых типов.

  • Автоматическая обработка нерегулярных проблем для повышения надежности.

Как я упоминал ранее, хотя это решение может использовать в качестве хранилища переполнения как распределенный кэш, так и хранилище больших двоичных объектов, использование кэша Azure влечет за собой дополнительные затраты. Перед принятием решения об использовании кэша в качестве хранилища переполнения необходимо провести тщательную оценку требований к хранению для вашего проекта и выполнить анализ затрат на основе прогнозируемого количества и размеров сообщений.

Хотя это решение предоставляет легкий в использовании метод поддержки больших сообщений в очередях Azure, всегда есть возможность усовершенствования. Вот несколько примеров других ценных функций, не включенных в это решение, которые вам могут пригодиться.

  • Возможность настраивать тип хранилища переполнения больших сообщений в конфигурации приложения.

  • Дополнительные пользовательские сериализаторы в случае, если сериализатор по умолчанию не соответствуют конкретным целям производительности или функциональным потребностям (например, если вас не удовлетворяет сжатие по умолчанию).

  • Элемент навигации в метаданных большого двоичного объекта, позволяющий просматривать хранилище больших двоичных объектов и быстро определять наличие любых потерянных больших двоичных объектов больших сообщений (зомби).

  • Компонент «сборщик мусора», который обеспечит своевременное удаление любых потерянных больших двоичных объектов из хранилища переполнения сообщений (для ситуаций, когда доступ к очередям осуществляется и другими компонентами, а не только реализованным здесь уровнем абстракции хранилища).

Сопутствующий образец кода доступен для загрузки из коллекции исходных кодов MSDN. Обратите внимание, что открытая лицензия Microsoft распространяется не на все файлы исходного кода, как описано в соответствующей юридической информации.

Дополнительные ресурсы и ссылки

Дополнительные сведения о теме, обсуждаемой в этой статье, см. в следующих разделах:

Добавления сообщества

Показ:
© 2016 Microsoft