VENTAS: 1-800-867-1389

Best Practices for Handling Large Messages with Azure Queues

Actualizado: marzo de 2015

Autor: Valery Mizonov

Revisores: Brad Calder, Larry Franks, Jai Haridas, Sidney Higa, Christian Martinez, Curt Peterson y Mark Simms

El fin de este artículo es ofrecer a los desarrolladores una orientación para la implementación de una capa de abstracción del almacenamiento con elementos genéricos para el servicio Cola de Azure. Para admitir mensajes muy grandes en las colas de Azure, deben superarse las limitaciones de tamaño de mensaje actuales. Este artículo y el código asociado le ayudan a usar Colas de Azure sin tener que entrar en la contabilidad de tamaño de mensaje impuesta por el límite de 64 KB de la cola.

Recuerde cuando pensamos: "¡640 KB deberían ser suficiente para cualquiera! " En el pasado, algunos kilobytes suponían un lujoso espacio de almacenamiento en el que un desarrollador disciplinado podía, para su dicha, poner todos los datos de su aplicación. Hoy, la cantidad de datos que las aplicaciones modernas tienen que poder intercambiar puede variar considerablemente. Tanto si se trata de un minúsculo documento HL7 como si es un documento EDI de muchos megabytes, las aplicaciones modernas tienen que tratar con todos los tipos de características volumétricas que evolucionan a una velocidad imprevisible. Un objeto de negocio que se expresaba en una estructura de varios bytes en el siglo pasado fácilmente puede ahora presentarse como un artefacto ávido de almacenamiento varias veces mayor que sus antecesores gracias a las modernas técnicas y formatos de representación y serialización.

El tratamiento de los mensajes en una arquitectura de una solución dada sin la imposición de limitaciones técnicas en el tamaño de los mensajes es la clave para admitir volúmenes de datos en continuo cambio. Los mensajes grandes no siempre pueden evitarse. Por ejemplo, si una solución B2B está diseñada para controlar el tráfico EDI, tiene que estar preparada para recibir documentos EDI de hasta varios megabytes. Cada capa, servicio y componente del flujo global necesita adaptarse al tamaño del documento que se va a procesar. Aceptar sin problemas un documento Inventory Advice EDI 846 de 20 MB a través de un servicio web, pero no poder almacenarlo en una cola para procesarlo debido a las restricciones de tamaño de los mensajes de esta se consideraría un desagradable descubrimiento durante las pruebas.

¿Por qué elegiría alguien usar una cola para mensajes grandes en la plataforma de Azure? ¿Qué va mal con las otras alternativas como los blobs, las tablas, las unidades de la nube o Base de datos SQL de Microsoft Azure, por citar solo algunos? Tenga en cuenta que las colas permiten implementar determinados tipos de escenarios de mensajería que se caracterizan por las comunicaciones asincrónicas, de acoplamiento flexible entre los productores y los consumidores de forma escalable y confiable. El uso de las colas de Azure desvincula las diferentes partes de una solución proporcionada y ofrece una semántica única como FIFO y la entrega del tipo "al menos una vez". Esta semántica puede ser algo difícil de implementar con otros mecanismos de intercambio de datos alternativos. Además, las colas son más adecuadas como almacén volátil para intercambiar datos entre servicios, niveles y aplicaciones, no como almacenamiento de datos persistente. El requisito de intercambio de datos respectivo puede manifestarse de muchas formas diferentes, por ejemplo, al pasar mensajes entre los componentes de forma asincrónica, al equilibrar la carga o al escalar cargas de trabajo de proceso complejas. Muchos de estos patrones de intercambio de datos no son algo que pueda implementarse fácilmente sin colas. En resumen, las colas son una capacidad fundamental. No tener que preocuparse de qué se puede y no se puede poner en una cola es un argumento sólido para crear soluciones unificadas de mensajería basadas en cola que puedan tratar datos de cualquier tamaño.

Esta capa de abstracción facilitará la publicación y el uso de instancias de las entidades específicas de la aplicación en lugar de tener que tratar con matrices o cadenas de bytes que son los únicos tipos que admite actualmente la API del servicio Cola. Voy usar mucho los elementos genéricos de .NET, sacaré provecho de algunas funciones de valor agregado como la compresión y descompresión transparente de flujos además de aplicar algunas prácticas recomendadas conocidas, como el control de los errores intermitentes para mejorar la tolerancia a errores de las operaciones de almacenamiento.

En la actualidad, un mensaje mayor de 64 KB (después de serializarse y codificarse) no puede almacenarse en una cola de Azure. La API de cliente devolverá una excepción si intenta colocar un mensaje mayor de 64 KB en una cola. Al escribir este artículo, el límite de tamaño del mensaje que devuelve esta propiedad es 65536.

ImportantImportante
Cuando se transmiten a una cola, los mensajes están sujetos a la codificación Base64. Las cargas codificadas siempre son mayores que los datos sin procesar. La codificación Base64 agrega una sobrecarga del 25 % de promedio. Como resultado, el límite de tamaño de 64 KB prohíbe en efecto almacenar mensajes con una carga mayor de 48 KB (75 % de 64 KB).

Aunque es el límite para un único elemento de la cola, puede parecer prohibitivo para ciertos tipos de mensajes, especialmente los que no se pueden dividir en fragmentos menores. Desde la perspectiva del desarrollador, preocuparse de si un mensaje determinado se puede incluir en una cola no contribuye a la productividad. Al final del día, el objetivo es hacer que los datos de mi aplicación fluyan entre los productores y los consumidores de la manera más eficaz, independientemente del tamaño de los datos. Mientras un lado llama a Put (o a Enqueue) y el otro lado invoca a Get (o a Dequeue) en una cola, el resto debe ocurrir en teoría de modo automático.

Superar la limitación de tamaño de los mensajes en las colas de Azure tratando de forma inteligente los mensajes grandes es la premisa clave del problema técnico que se trata en este artículo. Requerirá cierta maña adicional. En el mundo moderno del desarrollo de software comercial, cualquier esfuerzo de desarrollo extra tiene que justificarse con sabiduría. Voy a justificar las inversiones adicionales con los siguientes objetivos de diseño:

  • Compatibilidad con los mensajes muy grandes al eliminar las restricciones impuestas por la API del servicio Cola en lo referente al tamaño de los mensajes.

  • Compatibilidad con los objetos genéricos definidos por el usuario al publicar y usar mensajes de una cola de Azure.

  • Desbordamiento transparente en un almacén de mensajes configurable, ya sea un contenedor de blobs, una caché distribuida u otro tipo de repositorio capaz de almacenar mensajes grandes.

  • Compresión transparente diseñada para aumentar la rentabilidad minimizando la cantidad de espacio de almacenamiento utilizado por los mensajes grandes.

  • Mayor confiabilidad a través de la puesta en práctica generalizada de prácticas recomendadas de condiciones transitorias al realizar operaciones con las colas.

La base para admitir mensajes grandes en colas con limitación de tamaño será el modelo siguiente. Primero, compruebo si un mensaje determinado se puede incluir en una cola de Azure sin realizar ningún trabajo adicional. Para determinar si un mensaje puede almacenarse con seguridad en una cola sin infringir las restricciones de tamaño, se usa una fórmula que incluyo en una función auxiliar como se explica a continuación:

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

Si el tamaño del mensaje es inferior al límite aplicado, simplemente debo invocar la API del servicio Cola para poner en cola el mensaje “tal cual”. Si el tamaño del mensaje es superior a la limitación en cuestión, el flujo de datos adquiere bastante interés. En el diagrama de flujo siguiente se visualizan los pasos posteriores:

Almacenar-flujo-de-mensajes1

En resumen, si un mensaje no se puede incluir en una cola debido a su tamaño, se desborda en un almacén de mensajes capaz de almacenar mensajes grandes. Entonces se crea un mensaje minúsculo de metadatos formado por una referencia al elemento en el almacén de desbordamiento. Finalmente, el mensaje de metadatos se coloca en una cola. Opto siempre por comprimir un mensaje antes de afirmar que es adecuado para persistir en una cola. Así se expande de forma efectiva el rellenado de los mensajes que pueden ponerse en la cola sin incurrir en la necesidad de pasar al almacén de desbordamiento. Un buen ejemplo es un documento XML algo mayor de 64 KB que, tras la serialización y la compresión, se convierte en un candidato perfecto para colocarse simplemente en una cola. Puede modificar este comportamiento en caso de que la compresión predeterminada no sea conveniente. Para ello, puede proporcionar un componente de serializador personalizado elaborado en la sección siguiente.

En este caso se aplican varias consideraciones, principalmente desde la perspectiva del costo. Como puede observarse en el diagrama de flujo anterior, intento determinar si un mensaje grande puede desbordarse en Caché de Azure. Puesto que el uso del servicio Caching basado en la nube distribuido está sujeto a una tarifa, la ruta de acceso de desbordamiento de la memoria caché debe ser opcional. Esto se refleja en el diagrama de flujo.

Además, puede haber situaciones en las que el mensaje sea muy grande y, por tanto, no sea adecuado para almacenarlo en una memoria caché distribuida con limitaciones de tamaño. Al redactar este artículo, el tamaño de caché máximo es de 4 GB. Por consiguiente, debemos tener esto en cuenta y proporcionar una ruta de acceso de conmutación por error por si superamos la capacidad o las cuotas de la memoria caché. Las cuotas suponen un comportamiento de desalojo que también tiene que considerarse.

ImportantImportante
El uso del servicio Caché de Azure como almacén de desbordamiento reduce la latencia y elimina las transacciones excesivas de almacenamiento al intercambiar un gran número de mensajes. Proporciona una infraestructura de almacenamiento en caché distribuida y de alta disponibilidad, capaz de llevar a cabo la replicación y de mantener los datos de caché en memoria en varios servidores de caché para conseguir su durabilidad. Estas ventajas pueden verse contrarrestadas por los costos y la limitación del tamaño de caché asociados al uso del servicio. Por lo tanto, es importante realizar un análisis de los costos y los beneficios para evaluar las ventajas y los inconvenientes de introducir el servicio Caché de Azure como almacén de desbordamiento en ciertos escenarios.

noteNota
Para obtener instrucciones sobre cómo elegir la oferta de Caché de Azure adecuada para su aplicación, vea ¿Cuál es la oferta de Caché de Azure más adecuada para mí?

Esta es una aplicación auxiliar que determina si el valor de tamaño del elemento especificado puede considerarse como óptimo al almacenar un elemento de un tamaño determinado en la memoria caché:

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

Una vez considerados algunos requisitos previos iniciales, es hora de cambiar al lado del consumidor y echar un vistazo al modelo de implementación para recuperar mensajes grandes de una cola. Primero, vamos a visualizar el flujo del proceso con el fin de facilitar la comprensión general:

Desbordamiento-de-mensajes-grandes

Para resumir el flujo anterior, un mensaje de un tipo desconocido se captura de una cola y se compara con un tipo de mensaje de metadatos. Si no es un mensaje de metadatos, el flujo continúa con la lógica de descompresión, para poder crear correctamente el mensaje original antes de mostrarlo al consumidor. Por el contrario, si fuera de hecho un mensaje de metadatos, se inspecciona para determinar el tipo de almacén de desbordamiento usado para almacenar el mensaje real. Si se identifica como mensaje almacenado en caché, se invoca a la API del servicio Caching respectivo y el mensaje real se captura antes de descomprimirse y devolverse al consumidor. En caso de que el mensaje real se hubiera puesto en un contenedor de blobs, la API del servicio Blob se usará para recuperar el mensaje real de la entidad de blobs, que se descomprimirá y se devolverá al autor de la llamada.

Además de controlar las operaciones Enqueue y Dequeue para los mensajes grandes, hay que asegurarse de que todas las cargas desbordadas se quitarán de sus almacenes de mensajes de desbordamiento respectivos tras la solicitud del consumidor. Para lograr esto, uno de los posibles modelos de implementación es acoplar el proceso de eliminación con la operación Delete cuando se invoca para un mensaje determinado. La representación visual de esta operación puede ser descrita como sigue:

Desbordamiento-de-mensajes-grandes

Antes de empezar a implementar los modelos mencionados anteriormente, una última consideración que conviene realizar es la definición de un mensaje. ¿Qué se consideraría un mensaje y en qué formas se manifestará? ¿Sería una matriz de bytes, un flujo de datos, un tipo simple como una cadena o un objeto complejo específico de la aplicación que el programador implementa como parte del modelo de objetos de la solución? Creo firmemente que este es el área donde no debemos restringirnos. Supongamos que un mensaje es de un tipo genérico <T>, que significa que es algo que el desarrollador desea utilizar. Verá que la implementación final se desarrollará de forma natural alrededor de esta idea.

En conjunto, el diagrama siguiente resume las tres formas posibles que se tienen en cuenta en el diseño anterior:

Rutas-de-transmisión-de-los-mensajes

En este punto, parece haber suficiente información para empezar a llevar a la práctica el diseño técnico. A partir de aquí, cambiaré el enfoque al código fuente necesario para implementar los modelos descritos anteriormente.

Para seguir adelante, descargue el código de ejemplo completo de la galería de código de MSDN. El ejemplo se envía como parte de una implementación de referencia global que usa los modelos tratados en este artículo. Una vez descargado y descomprimido, navegue al proyecto Azure.Services.Framework debajo de Contoso.Cloud.Integration y expanda la carpeta Almacenamiento. Esta ubicación contiene todos los artefactos de código principales que se describen a continuación.

Como se indicó al principio, la idea original era resumir la forma en que una aplicación de nube interactúa con las colas de Windows Azure. Yo abordo este requisito proporcionando un contrato que gobierna las operaciones principales admitidas por mi capa de abstracción de almacenamiento personalizada. La interfaz de programación a través de la cual el contrato se presenta a los consumidores se muestra a continuación. Omití intencionadamente algunas características de infraestructura del fragmento de código siguiente como son la creación y la eliminación de las colas dado que estas no agregan ningún valor significativo en este momento.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

También se requiere un contrato adicional (interfaz) que resuma el acceso al almacén de desbordamiento de mensajes grandes. Dos componentes implementan el contrato, uno para cada almacén de desbordamiento (almacenamiento de blob y caché distribuida). El contrato consta de las siguientes operaciones:

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

Ambos contratos dependen en gran medida del tipo genérico <T>. Permite personalizar el tipo de mensaje a cualquier tipo .NET que prefiera. Sin embargo, tendré que controlar algunos casos extremos de uso, como son los tipos que requieren un trato especial, como los flujos. Profundizaré en esto después.

Independientemente del tipo de mensaje elegido, se aplicará un requisito importante: el tipo de objeto que representa un mensaje de una cola debe ser serializable. Todos los objetos que se pasan a través de la capa de abstracción de almacenamiento están sujetos a la serialización antes de que aterricen en una cola o en un almacén de desbordamiento. En mi implementación, la serialización y la deserialización también están acoplados con la compresión y la descompresión, respectivamente. Este enfoque aumenta la eficacia desde la perspectiva del ancho de banda y del costo. La ventaja en relación al costo se deriva del hecho de que los mensajes grandes comprimidos utilizan intrínsecamente menos almacenamiento, lo que resulta en una disminución en los costos de almacenamiento. La eficacia del ancho banda surge del ahorro en el tamaño de la carga debida a la compresión, que a su vez conlleva cargas mayores en el hilo a medida que fluyen por el almacenamiento de Azure.

El requisito de serialización y deserialización se declara en una interfaz especializada. Cualquier componente que implemente esta interfaz debe proporcionar una funcionalidad específica de compresión, serialización, deserialización y descompresión. Se muestra un ejemplo de esta interfaz:

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

Para la compresión y la descompresión, utilizo el componente DeflateStream de .NET Framework. Esta clase representa el algoritmo Deflate, un algoritmo compatible con RFC estándar del sector para la compresión y la descompresión de archivos sin pérdida. En comparación con la clase GZipStream, lo anterior genera imágenes comprimidas óptimas y suele ofrecer un mejor rendimiento. En cambio, la clase GZipStream utiliza el formato de datos GZIP, que incluye un valor de prueba de redundancia cíclica (CRC) para detectar daños en los datos. En segundo plano, el formato de datos GZIP utiliza el mismo algoritmo de compresión que la clase DeflateStream. En resumen, GZipStream = DeflateStream más el costo de calcular y almacenar las sumas de comprobación CRC.

Mi implementación del contrato se incluye a continuación. Observe que los algoritmos de compresión pueden alternarse fácilmente reemplazando la clase DeflateStream con GZipStream y viceversa.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

Una de las capacidades eficaces en la implementación de CloudStorageEntitySerializer es la capacidad de aplicar un trato especial al administrar los documentos XML de ambos modos: XmlDocument y XDocument. La otra área que conviene destacar es la serialización y la deserialización óptimas de los datos XML. Aquí, decidí sacar provecho de las clases XmlDictionaryReader y XmlDictionaryWriter que ya conocen los desarrolladores de software de .NET como una opción magnífica cuando se trata de realizar la serialización y la deserialización eficaces de las cargas XML mediante el formato XML binario de .NET.

La decisión con respecto al tipo de almacén de mensajes de desbordamiento es responsabilidad del consumidor que llama a la capa de abstracción de almacenamiento personalizada. En estas líneas, voy a proporcionar una opción para seleccionar el tipo de almacén de mensajes deseado agregando los constructores siguientes en el tipo que implementa la interfaz ICloudQueueStorage:

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

Los constructores anteriores no realizan un trabajo complejo, simplemente inicializan los miembros internos y configuran el componente cliente que tendrá acceso a una cola de Azure. Conviene tener en cuenta que indico explícitamente al cliente de la cola que no aplique ninguna directiva de reintento. Para proporcionar una capa de abstracción sólida y confiable de almacenamiento, necesito un control más exhaustivo de los errores transitorios al realizar las operaciones en las colas de Azure. Por consiguiente, habrá un componente independiente que reconoce y puede controlar una variedad mucho mayor de errores intermitentes.

Ahora echemos un vistazo a las características internas de la clase ReliableCloudQueueStorage que elaboré anteriormente. En concreto, revisemos la implementación de una operación Put puesto que es la ubicación del desbordamiento transparente en un almacén de mensaje grande.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

Un nuevo artefacto de código recién manifestado en el fragmento anterior es la clase LargeQueueMessageInfo. Este tipo personalizado es a la larga nuestro mensaje de metadatos que describe la ubicación de un mensaje grande. Esta clase se marca como interna ya que no está concebida para ser visible para cualquiera fuera de la implementación de la capa de abstracción de almacenamiento. La clase se define del modo siguiente:

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

A continuación, necesito implementar un almacén de desbordamiento de mensajes grandes que saque provecho del servicio Blob Storage de Azure. Como señalé antes, este componente debe admitir la interfaz ICloudBlobStorage que utilizará el componente ReliableCloudQueueStorage para retransmitir los mensajes en la implementación ICloudBlobStorage cuando estos no se puedan incluir en una cola debido a la limitación del tamaño de los mensajes. Para establecer el escenario de los pasos siguientes, incluiré solo la implementación del constructor:

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

Anteriormente en este artículo, he mostrado la implementación de la operación Put que garantiza que los mensajes pequeños se coloquen siempre en una cola mientras que los mensajes grandes se enrutarán de forma transparente al almacén de desbordamiento. Para continuar, ahora revisaremos los mecanismos que subyacen a la operación Put equivalente implementada por el almacén de desbordamiento.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

En resumen, el código anterior usa un blob de tipo <T> y en primer lugar comprueba si ya es una imagen serializada de un mensaje en forma de objeto Stream. Todos los mensajes grandes que el componente ReliableCloudQueueStorage retransmita al almacenamiento de desbordamiento llegarán como flujos, listos para la persistencia. En caso de que el contenedor de blobs de destino no se encuentre, el código intentará crear el contenedor que falta. Realiza esta acción dentro de un ámbito con reintentos para mejorar la confiabilidad y aumentar la resistencia a los errores transitorios. La segunda forma de conmutación por error está pensada para controlar una situación en la que ya exista un blob con el mismo nombre. El código quitará el blob existente, siempre que se habilite el comportamiento de sobrescritura. Después de la eliminación, volverá a intentarse la carga del nuevo blob. Una vez más, esta operación se realiza en un ámbito con reintentos para que haya una mayor confiabilidad.

Ahora que puedo almacenar mensajes grandes en un contenedor de blobs, es hora de diseñar otra implementación de la interfaz ICloudBlobStorage que utilizará Caché de Azure. Por coherencia, vamos a empezar con sus constructores:

/// <summary>
/// Implements reliable generics-aware layer for Azure Cache.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

Si recuerda las consideraciones anteriores, una de las decisiones clave del diseño técnico era sacar provecho tanto del servicio Blob como del servicio Caché de Azure para almacenar los mensajes grandes. La opción de caché es más apropiada para los objetos transitorios que no superan el tamaño recomendado de carga de 8 MB. La opción de los blobs es básicamente para todo lo demás. En conjunto, esta decisión introduce la necesidad de un almacén híbrido de desbordamiento. La base para crear un almacén híbrido ya está en la base de código. Solo es cuestión de aunar los artefactos existentes de la forma siguiente:

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Cache in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that provides access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

En este punto, concluyo la saga incluyendo un fragmento más de código que muestra la implementación de la operación Put en el almacén híbrido de desbordamiento.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

Este artículo se consideraría incompleto si no pudiera proporcionar algunos ejemplos de cómo se puede utilizar la capa de abstracción de almacenamiento explicada anteriormente en una aplicación cliente. Combinaré estos ejemplos con una aplicación de prueba que también validará la implementación técnica.

Para comprobar que los mensajes grandes pueden pasar correctamente de un lado a otro a través de la capa de abstracción de almacenamiento recién implementada, se colocó a su lado una aplicación de consola muy simple. En el primer paso, toma un documento XML de ejemplo de 90 MB de tamaño y lo coloca en una cola de Azure. En el segundo paso, utiliza un mensaje de la cola. El mensaje debe de hecho ser el documento XML original que se escribe en el disco con un nombre diferente para poder comparar el tamaño del archivo y su contenido. Entre estos pasos, la aplicación entra en un modo de pausa durante el que puede explorar el contenido de la cola y el almacén de desbordamiento de mensajes respectivo como contenedor de blobs o caché. El código fuente para la aplicación de prueba se proporciona a continuación.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

Para completar el ejemplo, a continuación se muestra el archivo de configuración de la aplicación que se usó durante las pruebas. Si va a probar la aplicación de prueba, asegúrese de modificar su copia de app.config y agregue las credenciales de la cuenta de almacenamiento real y la información de extremo del servicio Caching.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

Siempre que la aplicación de prueba se haya compilado y ejecutado correctamente, se espera que aparezca una salida similar a la siguiente en la ventana de la consola:

Salida-de-la-ventana-de-la-consola

Si entra en la cuenta de almacenamiento que utiliza la aplicación de prueba, el siguiente mensaje aparecerá en la cola:

Mensaje-de-metadatos-serializado

Puesto que el mensaje de prueba era suficientemente grande como para desbordarse directamente en el almacenamiento blob, la captura de pantalla siguiente describe el contenido esperado en el contenedor del blob respectivo cuando se pausa la aplicación de prueba:

Contenedor-de-blob

Observe cómo el documento XML original de 90 MB utilizado en mi prueba se convirtió en un blob de 11 MB. Esto refleja el ahorro del 87 % en almacenamiento y ancho de banda que resultó de aplicar una serialización binaria XML. Dado el tipo de destino de los escenarios, la serialización binaria XML con compresión es la primera y la mejor opción.

Una vez que la aplicación de prueba continúa con la eliminación del mensaje de la cola, se espera que el mensaje de metadatos se quite junto con el blob que contiene los datos del mensaje, como se muestra en la captura de pantalla siguiente:

Blob-quitado

El ejemplo mostrado anteriormente refleja una vista simplista del ciclo de vida de un mensaje grande. Se ha diseñado para resaltar los aspectos básicos de la capa de abstracción de almacenamiento como el enrutamiento de los mensajes grandes en un almacén de blobs, la compresión transparente y la eliminación automática de ambas partes del mensaje. Intuyo que ahora es el momento adecuado para pasar a la conclusión.

Como hemos visto, el uso de las colas de Azure puede ampliarse para admitir los mensajes mayores de 64 KB aprovechando Caché de Azure y el servicio Blob de Azure sin agregar ninguna restricción técnica adicional en el cliente. De hecho, he mostrado que con un poco de trabajo adicional, puede mejorar la experiencia de mensajería del cliente proporcionando mejoras como son:

  • La compresión transparente de los mensajes para reducir los costos de almacenamiento y ahorrar ancho banda en el centro de datos.

  • Un desbordamiento transparente y fácilmente personalizable de los mensajes grandes en el almacenamiento en caché o en blobs.

  • Compatibilidad con elementos genéricos que permite almacenar con facilidad cualquier tipo de objeto.

  • El control automático de las condiciones transitorias para mejorar la confiabilidad.

Como mencioné anteriormente, aunque esta solución puede utilizar tanto el almacén en blobs como en caché distribuida para el almacenamiento del desbordamiento, el uso de Caché de Azure supone costos adicionales. Debería evaluar cuidadosamente los requisitos de almacenamiento del proyecto y realizar un análisis de los costos según el número previsto de mensajes y el tamaño de los mismos antes de decidir habilitar el desbordamiento en caché.

Aunque esta solución aporta una forma sencilla de permitir los mensajes grandes en las colas de Azure, siempre existe posibilidad de mejorarla. Algunos ejemplos de características de valor agregado que no se incorporan en esta solución y que puede ser conveniente agregar son:

  • La capacidad de configurar el tipo de almacén de desbordamiento de mensajes grandes en la configuración de la aplicación.

  • Los serializadores personalizados adicionales en caso de que el predeterminado no satisfaga los objetivos de rendimiento o las necesidades funcionales (por ejemplo, si no necesita la compresión predeterminada).

  • Un elemento en los metadatos de los blobs que actúa como ruta de navegación que permite recorrer el almacenamiento de blob y averiguar rápidamente si tiene blobs de mensajes grandes huérfanos (zombis).

  • Un componente recolector de elementos no utilizados que asegure la eliminación oportuna de los blobs huérfanos del almacén de mensajes de desbordamiento (en caso de que otros componentes aparte de la capa de abstracción de almacenamiento implementada aquí también obtengan acceso a las colas).

El código de ejemplo adjunto puede descargarse de la galería de código de MSDN. Tenga en cuenta que la licencia pública de Microsoft rige todos los archivos de código fuente, según se explica en los avisos legales correspondientes.

Para obtener más información sobre el tema descrito en este artículo, consulte lo siguiente:

¿Te ha resultado útil?
(Caracteres restantes: 1500)
Gracias por sus comentarios
Mostrar:
© 2015 Microsoft