Best Practices for Handling Large Messages with Azure Queues

Atualizado: março de 2015

Autor: Valery Mizonov

Revisores: Brad Calder, Larry Franks, Jai Haridas, Sidney Higa, Christian Martinez, Curt Peterson e Mark Simms

Este artigo tem como objetivo oferecer diretrizes a desenvolvedores sobre a implementação de uma camada de abstração de armazenamento com reconhecimento de genéricos para o serviço Fila do Azure. Para oferecer suporte a mensagens bem grandes em filas do Azure, as limitações de tamanho de mensagem de hoje devem ser ultrapassadas. Este artigo e o código associado ajudam você a usar as Filas do Azure sem precisar participar da contabilidade do tamanho de mensagem imposta pelo limite de 64 KB da fila.

Por que mensagens grandes?

Lembre-se de quando pensávamos que “640 K eram suficientes para qualquer pessoa!” Alguns quilobytes podiam comprar um espaço de armazenamento luxuoso no qual um desenvolvedor disciplinado de antigamente podia colocar todos os dados de seu aplicativo. Hoje, a quantidade de dados que os aplicativos modernos precisam trocar pode variar substancialmente. Seja uma mensagem HL7 minúscula ou um documento EDI de vários megabytes, os aplicativos modernos precisam lidar com todos os tipos de características volumétricas que avançam a uma velocidade imprevisível. Um objeto comercial que era expresso em uma estrutura multibyte no século passado pode facilmente apresentar-se hoje como um artefato que consome muito espaço de armazenamento e que é várias vezes maior que seu predecessor, graças às técnicas e aos formatos modernos de serialização e representação.

Manipular mensagens em determinada arquitetura de solução, sem impor limitações técnicas ao tamanho da mensagem, é fundamental para dar suporte a volumes de dados que crescem sem parar. Nem sempre é possível evitar mensagens grandes. Por exemplo, quando uma solução B2B é projetada para manipular tráfego EDI, a solução precisa estar preparada para receber documentos EDI de até vários megabytes. Cada camada, serviço e componente no fluxo ponta a ponta precisa acomodar o tamanho do documento que está sendo processado. Durante a realização de testes, seria extremamente desagradável descobrir que um documento EDI 846 Inventory Advice, de 20 MB, que foi aceito com êxito através de um serviço Web não pôde ser armazenado em uma fila para processamento, devido às restrições de tamanho das mensagens da fila.

Por que alguém escolheria usar uma fila para mensagens grandes na plataforma Azure? O que há de errado com outras alternativas, como blobs, tabelas, unidades em nuvem ou um Banco de dados SQL do Microsoft Azure, para dizer o mínimo? Veja, as filas permitem implementar certos tipos de cenários de mensagens que se caracterizam por comunicações assíncronas e frouxamente acopladas entre produtores e consumidores, realizadas de forma escalonável e confiável. O uso de filas do Azure separa diferentes partes de determinada solução e oferece uma semântica exclusiva, como o método PEPs (Primeiro a Entrar, Primeiro a Sair) e o protocolo de entrega At-Least-Once. Essa semântica pode ser um pouco difícil de implementar com o uso dos outros mecanismos alternativos de troca de dados. Além disso, as filas são ideais como repositório volátil para a troca de dados entre serviços, camadas e aplicativos, não como armazenamento de dados persistente. O respectivo requisito de troca de dados pode manifestar-se de muitas e diferentes formas, como a transmissão de mensagens entre componentes de maneira assíncrona, o nivelamento de carga ou a expansão de cargas de trabalho complexas de computação. Muitos desses padrões de troca de dados não podem ser implementados diretamente sem filas. Em resumo, as filas são um recurso crucial. O fato de não precisar se preocupar com aquilo que pode ou não entrar em uma fila é um forte argumento para criar soluções unificadas de mensagens baseadas em fila que possam manipular dados de qualquer tamanho.

Essa camada de abstração tornará mais fácil publicar e consumir instâncias das entidades específicas ao aplicativo, em vez de precisar negociar com matrizes de bytes ou cadeias de caracteres, que são, atualmente, os únicos tipos com suporte pela API do serviço Fila. Usarei amplamente os genéricos do .NET, aproveitarei as vantagens de alguns recursos com valor agregado, como a compactação e a descompactação de fluxos transparentes, e aplicarei algumas práticas recomendadas conhecidas, como o tratamento de falhas intermitentes, para melhorar o nível de tolerância a falhas de operações de armazenamento.

Considerações sobre design

No cenário atual, uma mensagem maior que 64 KB (depois de serializada e codificada) não pode ser armazenada em uma fila do Azure. A API do lado do cliente retornará uma exceção se você tentar colocar uma mensagem maior que 64 KB em uma fila. No momento de escrever esta postagem, o limite de tamanho de mensagens retornado por essa propriedade é de 65536.

ImportantImportante
As mensagens são sujeitas à codificação Base64 quando transmitidas para uma fila. As cargas codificadas são sempre maiores que seus dados brutos. A codificação Base64 adiciona 25% de sobrecarga em média. Como resultado, o limite de tamanho de 64 KB proíbe, efetivamente, o armazenamento de qualquer mensagem com carga maior que 48 KB (75% de 64 KB).

Embora se trate do limite para um único item da fila, ele pode ser considerado proibitivo para certos tipos de mensagens, especialmente para aquelas que não podem ser divididas em partes menores. Da perspectiva de um desenvolvedor, preocupar-se com o fato de determinada mensagem poder ser acomodada em uma fila não ajuda na produtividade. Ao final do dia, a meta é que os dados do meu aplicativo fluam entre produtores e consumidores da forma mais eficiente possível, independentemente do tamanho dos dados. Enquanto um lado chama Put (ou Enqueue), e o outro lado invoca Get (ou Dequeue) em uma fila, o resto deve, teoricamente, ocorrer de forma mágica.

A principal premissa para o desafio técnico elaborado neste artigo é superar o limite do tamanho de mensagens em filas do Azure, utilizando uma maneira inteligente de lidar com mensagens grandes. Isso custará algumas habilidades adicionais. No mundo moderno de desenvolvimento de softwares comerciais, todo esforço extra de desenvolvimento precisa ser criteriosamente justificado. Justificarei os investimentos adicionais com as seguintes metas de design:

  • Suporte a mensagens muito grandes por meio da eliminação de quaisquer restrições impostas pela API do serviço Fila, pois ela pertence ao tamanho da mensagem.

  • Suporte a objetos genéricos definidos pelo usuário ao publicar e consumir mensagens de uma fila do Azure.

  • Estouro transparente em um repositório de mensagens configurável, seja um contêiner de blob, um cache distribuído ou outro tipo de repositório capaz de armazenar mensagens grandes.

  • Compactação transparente, destinada a aumentar a eficiência de custos minimizando o espaço de armazenamento consumido por mensagens grandes.

  • Maior confiabilidade, na forma de uso extensivo da condição transitória, com o emprego das práticas recomendadas durante operações de enfileiramento.

O suporte a mensagens grandes em filas com restrição de tamanho terá como base o padrão a seguir. Primeiro, verifico se determinada mensagem pode ser acomodada em uma fila do Azure sem realizar trabalho extra. A maneira de determinar se uma mensagem pode ser armazenada com segurança em uma fila, sem violar as restrições de tamanho, será por meio de uma fórmula, encapsulada em uma função auxiliar, como a seguir:

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

Se o tamanho da mensagem for menor que o limite em vigor, basta invocar a API do serviço Fila para enfileirar a mensagem "como está". Se o tamanho da mensagem exceder o limite em questão, o fluxo de dados ficará muito interessante. O seguinte fluxograma visualiza as etapas subsequentes:

Store-Message-Flow1

Em resumo, se uma mensagem não puder ser acomodada em uma fila devido ao seu tamanho, ela será colocada como excedente em um repositório de mensagens capaz de armazenar mensagens grandes. Em seguida, uma mensagem minúscula de metadados será criada, consistindo em uma referência ao item no repositório de estouro. Finalmente, a mensagem de metadados será colocada em uma fila. Sempre escolho compactar uma mensagem antes de declarar sua adequação à persistência em uma fila. Isso, efetivamente, expande a população de mensagens que podem ser enfileiradas sem precisar recorrer ao repositório de estouro. Um bom exemplo é um documento XML um pouco maior que 64 KB que, após sua serialização e compactação, se torna um candidato perfeito a ser colocado simplesmente em uma fila. É possível modificar esse comportamento quando a compactação padrão não é o desejável. Isso pode ser feito com o fornecimento de um componente serializador personalizado, elaborado na próxima seção.

Existem várias considerações aplicáveis aqui, principalmente de uma perspectiva de custos. Como pode ser observado no fluxograma acima, tento determinar se uma mensagem grande pode primeiro estourar no Cache do Azure. Como a utilização do caching service distribuído, baseado em nuvem, está sujeita a um encargo, o caminho de estouro de cache deve ser opcional. Isso se reflete no fluxograma.

Além do mais, pode haver situações em que a mensagem é muito grande e, portanto, não é adequada para armazenamento em um cache distribuído com restrição de tamanho. No momento de escrever este artigo, o tamanho máximo de cache é de 4 GB. Portanto, devemos levar isso em conta e fornecer um caminho de failover quando ultrapassamos o cache de capacidade ou as cotas. As cotas vêm com comportamento de remoção, que também precisa ser levado em consideração.

ImportantImportante
O uso do Cache do Azure como um repositório de estouro ajuda a reduzir a latência e a eliminar as transações excessivas de armazenamento durante a troca de um grande número de mensagens. Ele oferece uma infraestrutura de cache distribuído altamente disponível, capaz de replicar e manter os dados armazenados em cache na memória em vários servidores de cache para durabilidade. Esses benefícios podem ser suplantados pela limitação de tamanho de cache e pelos custos associados ao uso do serviço. Portanto, é importante realizar uma análise de custo-benefício para avaliar os prós e os contras de introduzir o Cache do Azure como um repositório de estouro em determinados cenários.

noteObservação
Para obter orientação sobre como escolher a oferta certa de Cache do Azure para seu aplicativo, consulte oferta de Cache do Azure que é certa para mim?

Aqui está uma função auxiliar que determinará se o valor de tamanho do item especificado pode ser considerado como ideal ao armazenar um item de um determinado tamanho no cache:

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

Agora que já vimos alguns pré-requisitos iniciais, é hora de passar para o lado do consumidor e verificar o padrão de implementação para recuperação de mensagens grandes em uma fila. Primeiro, vamos visualizar o fluxo do processo para facilitar o entendimento geral:

Estouro de mensagens grandes

Para resumir o fluxo acima, uma mensagem de tipo desconhecido é obtida de uma fila e comparada com um tipo de mensagem de metadados. Se ela não for uma mensagem de metadados, o fluxo continuará com a lógica de descompactação, de forma que a mensagem original possa ser reconstruída corretamente antes de ser apresentada ao consumidor. Em contrapartida, se ela for, de fato, uma mensagem de metadados, essa mensagem será inspecionada para determinar o tipo de repositório de estouro usado para armazenar a mensagem real. Se ela for identificada como uma mensagem armazenada no cache, a respectiva API do Caching Service será invocada e a mensagem real será obtida antes de ser descompactada e retornada ao consumidor. Se a mensagem real for colocada em um contêiner de blob, a API do serviço Blob será direcionada para recuperar a mensagem real da entidade de blob, descompactada e retornada ao chamador.

Além de manipular as operações Enqueue e Dequeue para mensagens grandes, há necessidade de verificar se todas as cargas estouradas foram removidas dos respectivos repositórios de mensagens de estouro mediante solicitação do consumidor. Para realizar isso, um dos padrões de implementação potencial é acoplar o processo de remoção à operação Delete quando ele estiver sendo invocado para determinada mensagem. A representação visual dessa operação pode ser representada da seguinte maneira:

Estouro de mensagens grandes

Antes de começarmos a implementar os padrões mencionados acima, uma última consideração valiosa é definir uma mensagem. O que seria considerado uma mensagem e de que formas ela se manifesta? Seria uma matriz de bytes, um fluxo de dados, um simples tipo, como uma cadeia de caracteres, ou um objeto complexo específico a um aplicativo que o desenvolvedor implementa como parte do modelo de objeto da solução? Acredito realmente que não devemos nos restringir a essa área. Vamos apenas supor que uma mensagem seja do tipo genérico <T>, o que significa que seja qualquer coisa que o desenvolvedor deseje usar. Você verá que a implementação final será desdobrada naturalmente em torno dessa ideia.

Considerando todos os aspectos juntos, o diagrama a seguir resume todos os três caminhos possíveis de percurso explicados no design acima:

Message-Travel-Paths

Nesse ponto, parece haver entrada suficiente para começar dar vida ao design técnico. A partir daqui, mudarei o foco para o código-fonte necessário para implementar os padrões discutidos acima.

Implementação técnica

Para prosseguirmos, baixe o código de exemplo completo na galeria de códigos do MSDN. O exemplo é enviado como parte de uma implementação de referência ponta a ponta maior, que é apoiada pelos padrões discutidos neste artigo. Depois de baixar e descompactar o exemplo, navegue até o projeto Azure.Services.Framework em Contoso.Cloud.Integration e expanda a pasta Armazenamento. Esse local contém todos os principais artefatos de código discutidos a seguir.

Como observado no início, a ideia original era abstrair como um aplicativo em nuvem interage com filas do Windows Azure. Para abordar esse requisito, forneço um contrato que rege as principais operações aceitas em minha camada de abstração de armazenamento personalizado. A interface de programação através da qual o contrato é visto pelos consumidores é mostrada abaixo. Omiti, intencionalmente, algumas funções em nível de infraestrutura do trecho de código a seguir, como a criação e a exclusão de filas, pois essas funções não agregam valor significativo neste momento.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

Existe também a necessidade de um contrato (interface) extra, que removerá o acesso ao repositório de estouro de mensagens grandes. Dois componentes implementam o contrato, um para cada repositório de estouro (armazenamento de blob e cache distribuído). O contrato abrange as seguintes operações:

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

Ambos os contratos dependem fortemente do tipo genérico <T>. Ele permite adequar o tipo de mensagem a qualquer tipo .NET de sua escolha. Entretanto, precisarei tratar de alguns casos de uso extremo, a saber, os tipos que exigem tratamento especial, como os fluxos. Falarei mais a respeito posteriormente.

Independentemente do tipo de mensagem escolhido, um requisito importante é aplicado: o tipo de objeto que representa uma mensagem em uma fila deve ser serializado. Todos os objetos que passam pela camada de abstração de armazenamento estão sujeitos à serialização antes de entrarem em uma fila ou em um repositório de estouro. Em minha implementação, a serialização e a desserialização também são acopladas à compactação e à descompactação, respectivamente. Essa abordagem aumenta a eficiência da perspectiva de custo e de largura de banda. O benefício relacionado ao custo deriva do fato de que mensagens grandes compactadas consomem, inerentemente, menos armazenamento, resultando na redução dos custos de armazenamento. A eficiência da largura de banda advém da economia no tamanho da carga graças à compactação, que, por sua vez, torna as cargas menores na rede à medida que circulam pelo armazenamento do Azure.

O requisito para serialização e desserialização é declarado em uma interface especializada. Qualquer componente que implemente essa interface deve fornecer a funcionalidade específica de compactação, serialização, desserialização e descompactação. Um exemplo dessa interface é mostrado:

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

Para compactação e descompactação, eu uso o componente DeflateStream no .NET Framework. Essa classe representa o algoritmo Deflate, um algoritmo compatível com RFC padrão da indústria para compactação e descompactação de arquivo sem perdas. Comparada à classe GZipStream, a anterior produz imagens compactadas mais perfeitas e geralmente proporciona melhor desempenho. Em contrapartida, a classe GZipStream usa o formato de dados GZIP, que inclui um valor CRC (verificação de redundância cíclica) para detectar se há dados corrompidos. Nos bastidores, o formato de dados GZIP usa o mesmo algoritmo de compactação que a classe DeflateStream. Em resumo, GZipStream = DeflateStream + o custo de calcular e armazenar somas de verificação CRC.

Minha implementação do contrato é incluída a seguir. Observe que os algoritmos de compactação podem ser facilmente alternados com a substituição da classe DeflateStream pela classe GZipStream e vice-versa.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

Um dos recursos mais eficientes na implementação CloudStorageEntitySerializer é a capacidade de aplicar tratamento especial ao manipular documentos XML de ambos os tipos: XmlDocument e XDocument. A outra área que vale a pena destacar é o processo de serialização e desserialização ideal dos dados XML. Aqui, decidi aproveitar as vantagens das classes XmlDictionaryReader e XmlDictionaryWriter, conhecidas pelos desenvolvedores de .NET como uma excelente escolha quando se trata de executar a serialização e a desserialização eficientes de cargas XML usando o formato XML binário de .NET.

A decisão quanto ao tipo de repositório de mensagens de estouro cabe ao consumidor, que acessa a camada de abstração de armazenamento personalizado. Ao longo destas linhas, fornecerei uma opção para selecionar o tipo desejado de repositório de mensagens adicionando os construtores a seguir ao tipo que implementa a interface ICloudQueueStorage:

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

Os construtores acima não estão executando nenhum trabalho complexo; eles simplesmente inicializam os membros internos e configuram o componente do cliente que acessará uma fila do Azure. Entretanto, convém lembrar que informo o cliente da fila, explicitamente, de que ele não deve impor nenhuma política de repetição. Para fornecer uma camada de abstração de armazenamento robusta e confiável, preciso ter maior controle granular sobre problemas transitórios ao executar operações em filas do Azure. Portanto, haverá um componente separado que reconhece e é capaz de tratar de uma variedade muito maior de falhas intermitentes.

Examinaremos agora os componentes internos da classe ReliableCloudQueueStorage que esbocei acima. Especificamente, examinaremos sua implementação da operação Put, pois trata-se do local do estouro transparente em um repositório de mensagens grandes.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

Um novo artefato de código que acabou de se manifestar no trecho acima é a classe LargeQueueMessageInfo. Esse tipo personalizado é, em última análise, nossa mensagem de metadados que descreve o local de uma mensagem grande. Essa classe é marcada como interna porque não deve ser visível a ninguém fora da implementação da camada de abstração de armazenamento. A classe é definida da seguinte maneira:

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

Indo mais adiante, preciso implementar um armazenamento de estouro de mensagens grandes que otimizará o serviço de armazenamento de blob do Azure. Como apontado anteriormente, esse componente deve dar suporte à interface ICloudBlobStorage, que será consumida pelo componente ReliableCloudQueueStorage, para retransmitir mensagens na implementação ICloudBlobStorage sempre que essas mensagens não puderem ser acomodadas em uma fila devido ao limite de tamanho de mensagem. Para preparar o ambiente para as próximas etapas, incluirei somente a implementação do construtor:

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

Já mostrei, neste artigo, a implementação da operação Put, que assegura que mensagens pequenas sempre serão dispostas em uma fila, enquanto mensagens grandes serão encaminhadas, de forma transparente, para o repositório de estouro. Por razões de continuidade, não examinaremos a mecânica subjacente à operação Put correspondente implementada pelo repositório de estouro.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

Em resumo, o código acima obtém um blob do tipo <T> e verifica primeiro se ele já é uma imagem serializada de uma mensagem na forma de um objeto Stream. Todas as mensagens grandes transmitidas para o armazenamento de estouro pelo componente ReliableCloudQueueStorage chegam como fluxos, prontas para persistência. Caso o contêiner de blob de destino não seja encontrado, o código tentará criar o contêiner ausente. Ele executará essa ação em um escopo com suporte à repetição para melhorar a confiabilidade e aumentar a resiliência a falhas transitórias. O segundo caminho de failover serve para lidar com uma situação em que já existe um blob com o mesmo nome. O código removerá o blob existente, contanto que o comportamento de substituição esteja habilitado. Após a remoção, será tentado novamente o carregamento do novo blob. Enfatizando, essa operação é executada no escopo de suporte à repetição para maior confiabilidade.

Agora que já posso armazenar mensagens grandes em um contêiner de blob, é hora de projetar outra implementação da interface ICloudBlobStorage que usa o Cache do Azure. Para consistência, vamos começar com seus construtores:

/// <summary>
/// Implements reliable generics-aware layer for Azure Cache.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

Se você se lembra das considerações anteriores, uma das principais decisões chave de design técnico era aproveitar as vantagens do serviço Blob e do Cache do Azure para o armazenamento de mensagens grandes. A opção de cache é ideal para objetos transitórios que não excedam o tamanho recomendado de carga de 8 MB. A opção de blob é essencialmente para tudo o mais. Em geral, essa decisão introduz a necessidade de um armazenamento de estouro híbrido. A base para criar um armazenamento híbrido já está no codebase. Trata-se apenas de fazer a correspondência dos artefatos existentes, como a seguir:

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Cache in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that provides access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

Nesse ponto, concluo a saga incluindo mais um trecho de código que mostra a implementação da operação Put no armazenamento de estouro híbrido.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

Este artigo seria considerado incompleto se eu não fornecesse alguns exemplos de como a camada de abstração de armazenamento discutida acima pode ser consumida de um aplicativo cliente. Combinarei esses exemplos com um aplicativo de teste, que também validará a implementação técnica.

Validação

Para comprovar que mensagens grandes podem ir e voltar através da camada de abstração de armazenamento recém-implementada, um aplicativo de console muito simples foi incluído. Na primeira etapa, o aplicativo seleciona um documento XML de exemplo, de 90 MB, e o coloca em uma fila do Azure. Na segunda etapa, ele consome uma mensagem da fila. A mensagem deve, na verdade, ser o documento XML original, que é gravado no disco com outro nome, para poder comparar o tamanho do arquivo e o respectivo conteúdo. Entre essas etapas, o aplicativo entra em pausa, durante a qual é possível explorar o conteúdo da fila e do respectivo armazenamento de estouro de mensagens, como um cache ou contêiner de blob. O código-fonte do aplicativo de teste é fornecido abaixo.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

Para integridade, o arquivo de configuração do aplicativo que foi usado durante o teste encontra-se abaixo. Se você pretende avaliar o aplicativo de teste, modifique sua cópia de app.config e adicione as credenciais da conta de armazenamento real e as informações de ponto de extremidade do Caching Service.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

Desde que o aplicativo de teste tenha sido compilado e executado com êxito, espera-se uma saída semelhante ao seguinte nas janelas do console:

Console-Window-Output

Se você examinar a conta de armazenamento usada pelo aplicativo de teste, a seguinte mensagem aparecerá na fila:

Serialized-Metadata-Message

Como a mensagem de teste era grande o suficiente para estourar diretamente no armazenamento de blob, a captura de tela a seguir mostra o conteúdo esperado no respectivo contêiner de blob enquanto o aplicativo de teste está em pausa:

Contêiner de blobs

Observe como o documento XML original de 90 MB usado em meu teste se tornou um blob de 11 MB. Isso reflete uma economia de 87% em armazenamento e largura de banda, resultante da aplicação de serialização binária de XML. Dada a classe de destino dos cenários, a serialização binária de XML + a compactação é a primeira e melhor escolha.

Uma vez que o aplicativo de teste prossiga com a exclusão da mensagem da fila, espera-se que a mensagem de metadados seja removida com o blob que armazena os dados da mensagem, como mostrado na captura de tela a seguir:

Blob-Removed

O exemplo mostrado acima reflete uma exibição simplificada do ciclo de vida de uma mensagem grande. Seu objetivo é realçar os fundamentos da camada de abstração de armazenamento, como o roteamento de mensagens grandes para o repositório de blob, a compactação transparente e a remoção automática de ambas as partes da mensagem. Acho que já está na hora de concluir.

Conclusão

Como vimos, o uso de filas do Azure pode ser estendido para dar suporte a mensagens maiores que 64 KB, aproveitando o Cache do Azure e o serviço Blob do Azure, sem impor outras restrições técnicas ao cliente. Na realidade, mostrei que, com um pouco de trabalho extra, é possível aperfeiçoar a experiência do cliente com o uso de mensagens fornecendo melhorias de qualidade, como:

  • Compactação transparente de mensagens para reduzir os custos de armazenamento e economizar em largura de banda no/fora do datacenter.

  • Estouro transparente, facilmente personalizável de mensagens grandes no armazenamento de Cache ou de Blob.

  • Suporte a genéricos que permite armazenar facilmente qualquer tipo de objeto.

  • Tratamento automático de condições transitórias para maior confiabilidade.

Como mencionei anteriormente, embora essa solução possa usar tanto o cache distribuído quanto o repositório de blob para o armazenamento de estouro, o uso do Cache do Azure implica em custos adicionais. Antes de decidir habilitar o estouro usando o cache, você deve avaliar cuidadosamente os requisitos de armazenamento do projeto e executar uma análise de custo com base no número de mensagens projetadas e no tamanho das mensagens.

Embora essa solução forneça uma maneira fácil de dar suporte a mensagens grandes em filas do Azure, sempre há espaço para melhorias. Alguns exemplos de recursos com valor agregado não incorporados a essa solução e que talvez você deseje adicionar:

  • A capacidade de configurar o tipo de repositório de estouro de mensagens grandes na configuração do aplicativo.

  • Os serializadores personalizados adicionais, quando os serializadores padrão não atendem às suas metas de desempenho ou às suas necessidades funcionais (por exemplo, você não precisa da compactação padrão).

  • Um item nos metadados do blob que atua como navegação estrutural, permitindo que você examine o armazenamento de blob e descubra rapidamente se existem blobs órfãos de mensagens grandes (zumbis).

  • Um componente de “coletor de lixo” que assegura a remoção oportuna de todos os blobs órfãos do repositório de mensagens de estouro (quando as filas também são acessadas por outros componentes que não a camada de abstração de armazenamento implementada aqui).

O exemplo de código associado está disponível para download na galeria de códigos do MSDN. Observe que todos os arquivos de código-fonte são regidos pela licença pública da Microsoft, como explicado nos avisos legais correspondentes.

Referências e recursos adicionais

Para obter mais informações sobre o tópico discutido neste artigo, consulte o seguinte:

Contribuições da comunidade

Mostrar: