Best Practices for Handling Large Messages with Azure Queues

Mis à jour: mars 2015

Auteur : Valery Mizonov

Réviseurs : Brad Calder, Larry Franks, Jai Haridas, Sidney Higa, Christian Martinez, Curt Peterson et Mark Simms

Cet article propose une aide ciblée aux développeurs pour l'implémentation d'une couche d'abstraction de stockage compatible avec les génériques sur le service File d'attente de Azure. Pour prendre en charge des messages très volumineux dans des files d'attente Azure, il convient de surmonter les limitations actuelles de taille du message. Cet article et le code associé vous permettent d'utiliser les files d'attente Azure sans devoir calculer la taille des messages en relation avec la limite imposée de taille de file d'attente fixée à 64 Ko.

Pourquoi utiliser des messages volumineux ?

N'oubliez pas qu'à une certaine époque nous pensions que « 640 Ko devraient être largement suffisants tout le monde ! ». Quelques kilo-octets suffisaient pour créer un espace de stockage luxueux, dans lequel un développeur discipliné pouvait faire tenir l'ensemble de ses données d'application. Aujourd'hui, la quantité de données brassées par les applications modernes peut varier considérablement. Qu'il s'agisse d'un minuscule message HL7 ou d'un document EDI de plusieurs mégaoctets, les applications modernes doivent gérer toutes sortes de caractéristiques volumétriques évoluant à une vitesse imprévisible. Un objet métier qui, au siècle dernier, se contentait d'une structure à plusieurs octets, peut aujourd'hui se transformer en un artefact gourmand en stockage plusieurs fois plus gros que son prédécesseur grâce à la sérialisation et aux techniques et aux formats de représentation modernes.

Gérer les messages dans une architecture de solution spécifique sans imposer de limitations techniques à leur taille, est aujourd'hui la clé pour prendre en charge des volumes de données en perpétuelle évolution. Il n'est pas toujours possible d'éviter les messages volumineux. Par exemple, si une solution B2B est conçue pour gérer le trafic EDI, la solution doit être prête à recevoir des documents EDI d'une taille pouvant atteindre plusieurs mégaoctets. Chaque couche, chaque service et chaque composant du flux de bout en bout doit s'adapter à la taille du document traité. Réceptionner un document EDI « 846 Inventory Advice » de 20 Mo via un service Web mais ne pas arriver à l'enregistrer dans une file d'attente de traitement en raison des contraintes de taille des messages de la file d'attente, c'est une très mauvaise surprise pendant un test.

Pourquoi choisit-on d'utiliser une file d'attente pour les messages de grande taille sur la plateforme Azure ? Quel est l'inconvénient des autres méthodes telles que les objets blob, les tables, les disques sur le cloud ou les base de données SQL Microsoft Azure, pour n'en citer que quelques-unes ? Sachez que les files d'attente permettent d'implémenter certains types de scénarios de messagerie qui se caractérisent par des communications asynchrones et faiblement couplées entre les producteurs et les consommateurs, de manière fiable et évolutive. L'utilisation des files d'attente Azure découple différentes parties d'une solution donnée et offre des sémantiques uniques, telles que FIFO (premier entré, premier sorti) et la remise « Une fois au minimum ». Ces sémantiques peuvent être difficiles à implémenter avec d'autres mécanismes d'échange de données. En outre, les files d'attente sont mieux adaptées au stockage volatile pour échanger des données entre les services, les couches et les applications, sous la forme d'un stockage des données non persistant. La spécification d'échange de données correspondante peut prendre de nombreuses formes telles que la transmission des messages entre les composants de façon asynchrone, le nivellement de la charge, ou la mise à l'échelle de charges de travail complexes. La majorité de ces modèles d'échange de données ne sont pas faciles à mettre en œuvre sans recourir aux files d'attente. En résumé, les files d'attente sont une fonction cruciale. Le fait de ne pas avoir à vous préoccuper des éléments qui peuvent entrer ou non dans une file d'attente est un grand avantage lorsque l'on crée des solutions de messagerie basée sur les files d'attente unifiées en mesure de gérer toutes sortes de données, de toutes tailles.

Cette couche d'abstraction simplifiera la publication et la consommation des instances des entités spécifiques à l'application, et évitera de traiter des tableaux d'octets ou des chaînes, qui sont actuellement les seuls types pris en charge par l'API du service File d'attente. Je vais utiliser amplement les génériques .NET, certaines fonctions à valeur ajoutée telles que la compression et la décompression transparente du flux de données, et je vais également appliquer un certain nombre de meilleures pratiques connues, telles que la gestion des erreurs intermittentes, pour améliorer la tolérance de panne des opérations de stockage.

Remarques sur la conception

Aujourd'hui, un message d'une taille supérieure à 64 Ko (après sérialisation et encodage) ne peut pas être stocké dans une file d'attente Azure. L'API côté client renvoie une exception si vous tentez de placer un message d'une taille supérieure à 64 Ko dans une file d'attente. Au moment de la rédaction de cet article, la limite de taille des messages renvoyée par cette propriété est 65536.

ImportantImportant
Les messages sont codés en Base64 lorsqu'ils sont transmis à une file d'attente. Les charges utiles encodées sont toujours supérieures à leurs données brutes. L'encodage en Base64 ajoute une surcharge de 25 % en moyenne. Par conséquent, la limite de taille de 64 Ko empêche effectivement de stocker des messages dont la charge utile est supérieure à 48 Ko (75 % de 64 Ko).

Bien qu'il s'agisse de la limite pour un seul élément de la file d'attente, cela est très contraignant pour certains types de messages, en particulier ceux qui ne peuvent pas être décomposés en plusieurs petits segments. Du point de vue du développeur, se préoccuper de savoir si un message donné peut tenir dans une file d'attente n'aide pas à améliorer la productivité. En définitive, mon objectif est de faire en sorte que les données de mon application s'écoulent entre les producteurs et les consommateurs de la manière la plus efficace qui soit, quelle que soit leur taille. Tant qu'un côté appelle Put (ou Enqueue) et l'autre appelle Get (ou Dequeue) sur une file d'attente, le reste doit théoriquement se produire comme par magie.

Surmonter la limite des messages dans les files d'attente Azure en utilisant une méthode de traitement intelligente, est le défi technique principal que nous tenterons de relever dans cet article. Cela se fera au prix d'efforts supplémentaires. Dans le monde moderne du développement de logiciels professionnels, tout effort de développement supplémentaire doit être dûment justifié. Je vais justifier les investissements supplémentaires avec les objectifs de conception suivants :

  • Prise en charge de messages très volumineux en éliminant toutes les restrictions imposées par l'API de file d'attente en ce qui concerne la taille du message.

  • Prise en charge d'objets génériques définis par l'utilisateur lors de la publication et de la consommation de messages d'une file d'attente Azure.

  • Dépassement de capacité transparent dans une banque de données configurable soit un conteneur d'objets blob, soit un cache distribué, soit tout autre type de référentiel capable de stocker des messages.

  • Compression transparente visant à augmenter la rentabilité en réduisant la quantité d'espace de stockage utilisée par les messages volumineux.

  • Fiabilité accrue grâce à une utilisation extensive des meilleures pratiques de gestion des conditions transitoires lors des opérations de file d'attente.

La prise en charge de messages volumineux dans des files d'attente limitées en taille suivra le modèle de base suivant. D'abord, je vérifie si un message donné peut être placé dans une file d'attente Azure sans que cela nécessite un travail supplémentaire. Pour déterminer si un message peut être stocké en toute sécurité dans une file d'attente sans violer les contraintes de taille, je vais utiliser une formule encapsulée dans une fonction auxiliaire, comme suit :

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

Si la taille du message est inférieure à la limite imposée, je vais simplement appeler l'API de file d'attente pour placer le message dans la file d'attente « tel quel ». Si la taille du message dépasse la limite en question, le flux de données devient plus intéressant. Le diagramme de flux suivant visualise les étapes suivantes :

Stocker-Flux de messages1

En résumé, si un message ne peut pas tenir dans une file d'attente en raison de sa taille, il passe à une banque de données capable de stocker des messages volumineux. Un message de métadonnées minuscule est ensuite créé, comprenant une référence à l'élément dans la banque de dépassement de capacité. Enfin, le message de métadonnées est mis dans une file d'attente. Je choisis toujours de compresser un message avant de déterminer qu'il doit faire l'objet d'une persistance dans une file d'attente. Cela permet de placer davantage de messages dans la file d'attente et évite de devoir passer par la banque de dépassement de capacité. Un bon exemple est celui d'un document XML d'une taille légèrement supérieure à 64 Ko qui, après sérialisation et compression, devient un candidat parfait pour une simple mise en file d'attente. Vous pouvez modifier ce comportement au cas où la compression par défaut ne serait pas souhaitable. Il peut être obtenu en fournissant un composant sérialiseur personnalisé, élaboré dans la section suivante.

Il existe plusieurs règles qui s'appliquent à ce stade, principalement du point de vue du coût. Comme vous pouvez le remarquer dans le diagramme ci-dessus, je tente de déterminer si un message volumineux peut d'abord tenir dans le cache Microsoft Azure. Étant donné que l'utilisation du service Caching basé sur le cloud a un coût, cette solution doit rester une option. Ceci est illustré dans le diagramme de flux.

En outre, il peut y avoir des cas où le message est très volumineux et ne peut donc pas être stocké dans un cache distribué limité en taille. Au moment de la rédaction de cet article, la taille maximale du cache est de 4 Go. Par conséquent, nous devons prendre cela en compte et prévoir une solution si la capacité du cache ou les quotas sont dépassés. Les quotas dérivent du comportement d'éviction, qui doit également être pris en compte.

ImportantImportant
L'utilisation du cache Microsoft Azure comme banque de dépassement de capacité réduit la latence et supprime les transactions de stockage en excès en cas d'échange d'un grand nombre de messages. Le service offre une infrastructure de mise en cache hautement disponible et distribuée, capable de répliquer et de conserver les données en mémoire dans plusieurs serveurs de cache pour garantir le durabilité. Ces avantages peuvent s'avérer insuffisants si la limite de taille du cache et les coûts associés au service sont trop contraignants. Il est donc important d'effectuer une analyse du coût/bénéfice pour évaluer les avantages et les inconvénients de l'introduction du cache Microsoft Azure en tant que banque de dépassement de capacité, dans certains scénarios.

noteRemarque
Pour obtenir des conseils sur le choix de l'offre de cache Microsoft Azure appropriée à votre application, voir Quelle offre de cache Microsoft Azure me convient ?

Voici une fonction d'assistance qui détermine si la valeur de la taille d'élément spécifiée peut être considérée comme optimal lors du stockage d'un élément d'une taille donnée dans le cache :

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

Maintenant que nous avons abordé un certain nombre de prérequis, il est temps de nous tourner du côté du consommateur et de jeter un coup d'œil au modèle d'implémentation pour la récupération des messages d'une file d'attente. D'abord, visualisons le flux de traitement afin de faciliter la compréhension globale :

Flux de messages important

Pour résumer le flux ci-dessus, un message d'un type inconnu est extrait d'une file d'attente et comparé à un message de métadonnées. Si ce n'est pas un message de métadonnées, le flux reprend la logique de décompression, afin que le message d'origine puisse être correctement reconstruit avant d'être présenté au consommateur. Par contre, s'il s'agit d'un message de métadonnées, il est examiné pour déterminer le type de banque de dépassement de capacité utilisé pour stocker le message réel. S'il est identifié en tant que message stocké dans le cache, l'API du service Caching correspondante est appelée et le message réel est extrait avant d'être décompressé et renvoyé au client. Dans le cas où le message réel est placé dans un conteneur d'objets blob, l'API du service Blob sera ciblée pour extraire le message réel de l'entité blob, le décompresser et le remettre à l'appelant.

Outre la gestion des opérations Enqueue et Dequeue pour les messages volumineux, vous devez vous assurer que toutes les charges en dépassement sont supprimées des banques de dépassement respectives après la demande du consommateur. Pour ce faire, l'un des modèles d'implémentation possibles consiste à coupler le processus de retrait à l'opération Delete, lorsqu'il est appelé pour un message donné. La représentation visuelle de cette opération peut être illustrée comme suit :

Flux de messages important

Avant que nous commencions à implémenter les modèles mentionnés ci-dessus, une dernière remarque concernant la définition d'un message mérite d'être formulée. Que considère-t-on comme un message, et sous quelles formes se manifeste-t-il ? Serait-ce un tableau d'octets, un flux de données, un type simple comme une chaîne, ou un objet complexe spécifique à l'application que le développeur met en œuvre dans le modèle objet de la solution ? Je suis persuadé que nous ne devons en aucun cas limiter la définition d'un message. Supposons que le message est de type <T> générique, autrement dit qu'il peut être n'importe quel élément qu'un développeur souhaite utiliser. Vous constaterez que l'implémentation finale reflètera naturellement cette idée.

Le schéma suivant récapitule les trois solutions possibles prises en compte pour la conception ci-dessus :

Message-Itinéraire-Chemins d'accès

À ce stade, nous disposons de suffisamment d'éléments pour donner vie à la conception technique. À partir de ce paragraphe, je vais me concentrer sur le code source requis pour implémenter les modèles décrits ci-dessus.

Implémentation technique

Pour suivre la procédure, téléchargez l'exemple de code complet dans MSDN Code Gallery. L'exemple est fourni dans le cadre d'une implémentation de bout en bout de référence plus vaste qui repose sur les modèles décrits dans l'article. Après avoir décompressé le package, recherchez le projet Azure.Services.Framework sous Contoso.Cloud.Integration et développez le dossier Storage. Cet emplacement contient tous les principaux artefacts de code présentés ci-dessous.

Comme indiqué plus haut, l'idée d'origine était d'abstraire la façon dont une application dans le cloud interagit avec les files d'attente Windows Azure. Mon approche consiste à utiliser un contrat qui détermine les opérations principales prises en charge par ma couche d'abstraction de stockage personnalisée. L'interface de programmation via laquelle le contrat apparaît aux consommateurs est illustrée ci-dessous. J'ai intentionnellement supprimé certaines fonctions de niveau infrastructure dans l'extrait de code ci-dessous, notamment, la création et la suppression des files d'attente, car elles ne sont pas de grand intérêt à ce stade.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

Nous avons également besoin d'un contrat supplémentaire (interface) qui abstrait l'accès à la banque de dépassement de capacités contenant les messages volumineux. Deux composants implémentent le contrat, un pour chaque banque de dépassement (stockage d'objets blob et cache distribué). Le contrat comporte les opérations suivantes :

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

Les deux contrats reposent fortement sur le type générique <T>. Celui-ci vous permet d'adapter le type de message à n'importe quel type .NET de votre choix. Je devrais toutefois aborder certains cas d'usage extrêmes, à savoir, les types qui requièrent un traitement spécial, comme les flux de données. Nous aborderons ce point par la suite.

Quel que soit le type de message choisi, il est essentiel que le type d'objet qui le représente dans une file d'attente soit sérialisable. Tous les objets traités par la couche d'abstraction du stockage sont sérialisés avant d'être mis dans la file d'attente ou dans la banque de dépassement. Dans mon implémentation, la sérialisation et la désérialisation sont également couplées à la compression et la décompression, respectivement. Cette approche améliore l'efficacité du point de vue financier et de la bande passante. L'avantage financier est obtenu parce que les messages volumineux compressés consomment naturellement moins de stockage, ce qui entraîne une diminution des coûts. L'efficacité de la bande passante résulte de la taille réduite de la charge utile, obtenue grâce à la compression, ce qui se traduit par des charges plus petites transitant vers ou depuis le stockage Azure.

La spécification de sérialisation et de désérialisation est déclarée dans une interface spéciale. Tout composant qui implémente cette interface doit fournir les fonctionnalités de compression, sérialisation, désérialisation et décompression spécifiques. Voici un exemple de cette interface :

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

Pour la compression et la décompression, j'utilise le composant DeflateStream dans le .NET Framework. Cette classe représente l'algorithme Deflate, un algorithme conforme RFC standard pour la compression et la décompression de fichiers sans perte. Par rapport à la classe GZipStream, l'algorithme précédent produit des images compressées optimales, et est généralement plus performant. En revanche, la classe GZipStream utilise le format de données GZIP, incluant une valeur de contrôle de redondance cyclique (CRC) pour détecter les données endommagées. En arrière-plan, le format de données GZIP utilise le même algorithme de compression que la classe DeflateStream. En résumé, GZipStream = DeflateStream + le coût de calcul et de stockage des sommes de contrôle CRC.

Mon implémentation de l'accord est incluse ci-dessous. Notez que les algorithmes de compression peuvent facilement être permutés en remplaçant la classe DeflateStream par GZipStream et vice versa.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

L'une des fonctions puissantes de CloudStorageEntitySerializer est la possibilité d'appliquer un traitement spécial lors de la gestion de documents XML des deux versions : XmlDocument et XDocument. La sérialisation et la désérialisation optimales des données XML sont l'autre aspect intéressant. J'ai choisi d'utiliser les classes XmlDictionaryReader et XmlDictionaryWriter que les développeurs .NET jugent parfaites quand il s'agit de sérialiser et désérialiser des charges XML efficacement, avec le format XML binaire .NET.

La décision du type de banque de dépassement de capacité à utiliser revient au consommateur qui appelle dans la couche d'abstraction de stockage personnalisée. Dans ces lignes, je vais fournir une option pour sélectionner le type de banque de dépassement de capacité approprié, en ajoutant les constructeurs suivants dans le type qui implémente l'interface ICloudQueueStorage :

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

Les constructeurs ci-dessus n'effectuent aucune tâche complexe ; ils initialisent simplement les membres internes, et configurent le composant client qui accédera à une file d'attente Azure. Notez toutefois que j'indique explicitement au client de la file d'attente de n'appliquer aucune stratégie de reprise. Pour fournir une couche d'abstraction de stockage robuste et fiable, j'ai besoin de contrôler plus précisément les erreurs temporaires lors des opérations sur les files d'attente Azure. Par conséquent, il y aura un composant à part en mesure de reconnaître et de gérer un panel d'erreurs intermittentes beaucoup plus important.

Jetons un coup d'œil maintenant aux mécanismes internes de la classe ReliableCloudQueueStorage esquissés ci-dessus. Examinons notamment son implémentation de l'opération Put, car il s'agit de l'emplacement du dépassement transparent dans une banque de messages volumineux.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

Un nouvel artefact de code, qui vient d'apparaître dans l'extrait de code ci-dessus, est la classe LargeQueueMessageInfo. Ce type personnalisé est notre message de métadonnées qui décrit l'emplacement d'un message volumineux. Cette classe est marquée comme étant interne car elle n'est pas destinée à être visible à l'extérieur de l'implémentation de la couche d'abstraction de stockage. La classe est définie comme suit :

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

Ensuite, je dois implémenter une banque de dépassement de capacité qui reposera sur le service de stockage d'objets blob Azure. Comme je l'ai précisé plus haut, ce composant doit prendre en charge l'interface ICloudBlobStorage qui sera consommée par le composant ReliableCloudQueueStorage pour relayer les messages dans l'implémentation ICloudBlobStorage chaque fois qu'ils ne peuvent pas être mis dans une file d'attente en raison de la limite de taille. Pour préparer les étapes suivantes, je vais inclure uniquement l'implémentation du constructeur :

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

Plus haut dans cet article, j'ai illustré l'implémentation de l'opération Put qui garantit que les messages de petite taille sont toujours placés dans une file d'attente, tandis que les messages volumineux sont routés de façon transparente dans la banque de dépassement. Par souci de continuité, nous allons maintenant examiner le mécanisme qui sous-tendent l'opération Put homologue implémentée par la banque de dépassement.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

En résumé, le code ci-dessus prend un objet blob de type <T> et vérifie d'abord si c'est une image sérialisée d'un message, sous la forme d'un objet Stream. Tous les messages volumineux relayés à la banque de dépassement par le composant ReliableCloudQueueStorage arriveront sous la forme de flux, prêts à la persistance. Dans le cas où le conteneur cible d'objets blob est introuvable, le code essaie de créer le conteneur manquant. Il exécute cette action au sein d'une étendue de reprise, afin d'améliorer la fiabilité et d'augmenter la résilience aux erreurs temporaires. La deuxième solution de basculement est conçue pour gérer une situation dans laquelle il existe déjà un objet blob ayant le même nom. Le code supprime l'objet blob existant, si le comportement de remplacement est activé. Après la suppression, le chargement du nouvel objet blob est retenté. De nouveau, cette opération est effectuée au sein d'une étendue de reprise pour augmenter la fiabilité.

Maintenant que je peux stocker des messages volumineux dans un conteneur d'objets blob, le temps est venu de concevoir une autre implémentation de l'interface ICloudBlobStorage qu'utilise le cache Microsoft Azure. Par souci de cohérence, commençons par ses constructeurs :

/// <summary>
/// Implements reliable generics-aware layer for Azure Cache.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

Si vous n'avez pas oublié les remarques précédentes, vous savez que l'une des décisions de conception clés était de tirer parti du service Blob et du cache Azure pour stocker les messages volumineux. L'option de mise en cache convient mieux aux objets temporaires qui ne dépassent pas la taille recommandée de 8 Mo. L'option blob convient essentiellement pour tout le reste. D'un façon générale, cette décision requiert une banque de dépassement de capacité hybride. La base pour la création d'une banque hybride est déjà dans le code. Il s'agit simplement d'associer les artefacts existants comme suit :

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Cache in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that provides access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

À ce stade, je clos ce chapitre en ajoutant un ou plusieurs extraits de code indiquant l'implémentation de l'opération Put dans la banque de dépassement hybride.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

Cet article serait incomplet sans quelques exemples de consommation de la couche d'abstraction de stockage décrite ci-dessus à partir d'une application cliente. Je vais intercaler ces exemples avec une application de test qui validera également l'implémentation technique.

Validation

Afin de prouver que les messages volumineux transitent correctement dans les deux sens de la couche d'abstraction de stockage que nous venons d'implémenter, une application console très simple a été créée. Lors de la première étape, celle-ci prend un exemple de document XML de 90 Mo et le met dans une file d'attente Azure. Lors de la deuxième étape, elle consomme un message dans la file d'attente. Le message devrait être le document XML d'origine réécrit sur le disque sous un autre nom pour pouvoir comparer la taille et le contenu des deux fichiers. Entre ces étapes, l'application passe en mode veille, et vous pouvez parcourir le contenu de la file d'attente et de la banque de dépassement correspondante, soit du conteneur d'objets blob ou de cache. Le code source de l'application de test est fourni ci-dessous.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

Par souci d'exhaustivité, vous trouverez ci-dessous le fichier de configuration de l'application utilisé pendant le test. Si vous prévoyez d'utiliser l'application de test, assurez-vous de modifier votre copie du fichier app.config et d'y ajouter les informations d'identification réelles du compte de stockage et les informations du point de terminaison du service de mise en cache.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

Si l'application de test a été correctement compilée et exécutée, un résultat semblable au suivant devrait s'afficher dans la fenêtre de la console :

Console-Fenêtre-Sortie

Si vous jetez un coup d'œil au compte de stockage utilisé par l'application de test, le message suivant s'affiche dans la file d'attente :

Message de métadonnées sérialisé

Étant donné que le message du test est suffisamment grand pour passer directement dans le stockage d'objets blob, la capture d'écran suivante affiche le contenu attendu dans le conteneur d'objets blob respectif pendant que l'application de test est en veille :

Conteneur d'objets Blob

Notez que le document XML de 90 Mo d'origine utilisé dans mon test est maintenant un objet blob de 11 Mo. Cela reflète les économies de 87 % réalisées sur le stockage et la bande passante grâce à l'application de la sérialisation binaire XML. Compte tenu des scénarios cibles, la sérialisation XML binaire, associée à la compression, sont le meilleur choix possible.

Lorsque l'application de test passe à la suppression du message de la file d'attente, le message de métadonnées est supprimé avec l'objet blob contenant les données du message, comme indiqué dans la capture d'écran ci-dessous :

Objet Blob supprimé

L'exemple ci-dessus reflète une vision très simplifiée du cycle de vie d'un message volumineux. Il vise à souligner les caractéristiques principales de la couche d'abstraction de stockage, notamment le routage des messages volumineux dans un magasin d'objets blob, la compression transparente, la suppression automatique des deux parties du message. À ce stade, je suppose que le temps est venu de passer à la conclusion.

Conclusion

Comme nous l'avons vu, l'usage des files d'attente Azure peut être étendu pour prendre en charge des messages d'une taille supérieure à 64 Ko, en tirant parti du cache Azure et du service BLOB Azure, sans imposer de restrictions techniques supplémentaires au client. J'ai ici prouvé qu'à moindre effort, vous pouvez optimiser l'expérience de messagerie du client en implémentant des améliorations telles que :

  • La compression transparente des messages pour réduire les coûts de stockage et économiser la bande passante du centre de données (en entrée et en sortie).

  • Un dépassement de capacité transparent et facilement personnalisable pour stocker les messages volumineux dans un stockage d'objets blob ou de cache.

  • La prise en charge des génériques qui vous permet de stocker aisément tout type d'objet.

  • La gestion automatique des conditions temporaires pour une meilleure fiabilité.

Comme indiqué précédemment, si cette solution peut utiliser aussi bien le cache distribué qu'un magasin d'objets blob pour le dépassement de capacité, l'utilisation du cache Azure implique un coût supplémentaire. Il convient alors d'évaluer avec soin les besoins de stockage de votre projet, et d'effectuer une analyse des coûts basée sur le nombre de messages attendu et leur taille, avant de décider d'utiliser la mise en cache pour le dépassement de capacité.

Si cette solution propose un moyen simple de prendre en charge les messages volumineux dans les files d'attente Azure, il est toujours possible de l'améliorer. Voici quelques exemples de fonctionnalités de valeur, qui ne sont pas incorporées dans cette solution mais que vous pouvez souhaiter ajouter :

  • La possibilité de configurer le type de banque de dépassement de capacité dans la configuration de l'application.

  • Des sérialiseurs personnalisés supplémentaires, au cas où ceux utilisés ici ne répondraient pas à vos besoins de performances ou fonctionnels (par exemple, vous n'avez pas besoin de la compression par défaut).

  • Un élément dans les métadonnées de l'objet blob agissant comme un fil d'Ariane et vous permettant d'analyser votre stockage d'objets blob afin de détecter rapidement s'il existe des objets volumineux orphelins (des zombies).

  • Un composant « garbage collector » qui garantit la suppression en temps voulu de tous les objets blob orphelins de la banque de dépassement (lorsque les files d'attente sont également accessibles par des composants autres que la couche d'abstraction de stockage implémentée ici).

Les exemples de code contenus dans ce document sont disponibles au téléchargement dans MSDN Code Gallery. Notez que tous les fichiers de code source sont régis par la licence publique de Microsoft, comme expliqué dans les mentions légales correspondantes.

Références et ressources supplémentaires

Pour plus d'informations sur le sujet abordé dans cet article, voir la documentation suivante :

Ajouts de la communauté

Afficher: