Esporta (0) Stampa
Espandi tutto
Questo argomento non è stato ancora valutato - Valuta questo argomento

Procedure consigliate per la gestione di messaggi di grandi dimensioni con le code di Windows Azure

Autore: Valery Mizonov

Revisori: Brad Calder, Larry Franks, Jai Haridas, Sidney Higa, Christian Martinez, Curt Peterson e Mark Simms

Nell'articolo vengono fornite indicazioni orientate agli sviluppatori sull'implementazione di un livello di astrazione dell'archiviazione basato su generics per il servizio di accodamento Windows Azure. Il problema centrale affrontato in questo articolo riguarda il supporto di messaggi di grandi dimensioni nelle code di Windows Azure e il superamento del limite di dimensioni dei messaggi esistente. In poche parole, questo blog e il codice associato consentiranno di utilizzare le code di Windows Azure senza doversi preoccupare del calcolo delle dimensioni dei messaggi imposte dal limite di 64 KB delle code.

Perché utilizzare messaggi di grandi dimensioni?

In tempi felici qualcuno ha detto che "640 K dovrebbero essere sufficienti per chiunque". Pochissimi kilobyte rappresentavano uno spazio di archiviazione di lusso in cui uno sviluppatore organizzato poteva tranquillamente salvare tutti i dati di applicazione. Oggi la quantità di dati che le applicazioni devono riuscire a scambiare può variare in modo significativo. Che si tratti di un semplice messaggio HL7 o di un documento EDI di vari megabyte, le applicazioni moderne devono gestire qualsiasi tipo di caratteristica volumetrica mutevole a una velocità imprevedibile. Un oggetto business espresso in una struttura multibyte nel secolo scorso potrebbe tranquillamente rappresentare oggi un elemento con esigenze particolarmente elevate in termini di archiviazione e di dimensioni notevolmente superiori rispetto al suo predecessore grazie a tecniche e formati moderni di serializzazione e rappresentazione.

La gestione dei messaggi in una determinata architettura di soluzione senza il vincolo rappresentato da limitazioni tecniche sulle dimensioni dei messaggi è essenziale per poter supportare volumi di dati in costante evoluzione. Non è sempre possibile evitare messaggi di grandi dimensioni. Se ad esempio una soluzione B2B è progettata per gestire il traffico EDI, la soluzione riceverà molto probabilmente documenti EDI di diversi megabyte. Ogni livello, servizio e componente nel flusso end-to-end deve essere in grado di gestire le dimensioni del documento in fase di elaborazione. Riuscire ad accettare un documento EDI 846 Inventory Advice di 20 MB tramite un servizio Web, ma non essere in grado di archiviarlo in una coda per l'elaborazione a causa dei vincoli imposti dalle dimensioni dei messaggi della coda potrebbe costituire una scoperta alquanto problematica in fase di test.

Perché si dovrebbe scegliere di utilizzare una coda per messaggi di grandi dimensioni nella piattaforma Windows Azure? Quali problemi si verificano, a dir poco, con altre alternative quali BLOB, tabelle, unità cloud o database SQL di Windows Azure? Sia chiaro, le code consentono di implementare determinati tipi di scenari di messaggistica caratterizzati da comunicazioni asincrone a regime di controllo libero tra producer e consumer eseguite in modo affidabile e scalabile. L'utilizzo delle code di Windows Azure decuplica parti diverse di una soluzione specifica e offre semantica univoca, ad esempio le modalità di recapito FIFO (First In, First Out) e At-Least-Once. Tale semantica può essere in qualche modo difficile da implementare utilizzando meccanismi alternativi di scambio dei dati. Inoltre, le code sono più idonee come archivio volatile per lo scambio di dati tra servizi, livelli e applicazioni e non come archivio dati permanente. Il rispettivo requisito di scambio di dati può manifestarsi in molte forme diverse, ad esempio attraverso il passaggio di messaggi tra componenti in modo asincrono, il livellamento del carico o la scalabilità orizzontale di carichi di lavoro di calcolo complessi. Molti di questi modelli per lo scambio di dati non sono implementabili in modo diretto senza code. In breve, le code rappresentano una caratteristica essenziale. Non doversi preoccupare degli elementi che è possibile o impossibile inserire in una coda è essenziale ai fini della compilazione di soluzioni di messaggistica unificata basata su coda in grado di gestire dati di qualsiasi tipo e dimensione.

In questo articolo verrà implementata una soluzione che consentirà di utilizzare una coda di Windows Azure per lo scambio di messaggi di grandi dimensioni. Verrà inoltre semplificato il metodo di interazione della soluzione con le code di Windows Azure fornendo un livello di astrazione compilato utilizzando le API disponibili nello spazio dei nomi StorageClient. Questo livello di astrazione consentirà di semplificare la pubblicazione e l'utilizzo di istanze delle entità specifiche dell'applicazione, anziché dover gestire le stringhe o le matrici di byte che rappresentano gli unici tipi supportati oggi dall'API del servizio di accodamento. Verranno impiegati in modo esteso i generics .NET e alcune funzionalità a valore aggiunto quali la compressione e la decompressione trasparente dei flussi, nonché applicate alcune procedure consigliate note quali la gestione degli errori intermittenti per migliorare la tolleranza degli errori delle operazioni di archiviazione.

Considerazioni sulla progettazione

Allo stato attuale, un messaggio con dimensioni superiori a 64 KB (una volta serializzato e codificato) non può essere archiviato in una coda di Windows Azure. L'API lato client restituirà un'eccezione se si tenta di inserire in una coda un messaggio con dimensioni superiori a 64 KB. La dimensione massima consentita dei messaggi può essere determinata controllando la proprietà MaxMessageSize della classe CloudQueueMessage. Al momento della creazione di questo post, il limite delle dimensioni dei messaggi restituito da questa proprietà è pari a 65536.

ImportantImportante
La dimensione massima dei messaggi definita nella proprietà CloudQueueMessage.MaxMessageSize non riflette la dimensione massima di payload consentita. I messaggi sono soggetti a codifica Base64 quando vengono trasmessi a una coda. I payload codificati sono sempre di dimensioni superiori rispetto ai dati non elaborati. La codifica Base64 aggiunge in media un overhead del 25%. Di conseguenza, il limite delle dimensioni di 64 KB impedisce l'archiviazione di messaggi con payload maggiore di 48 KB (75% di 64 KB).

Sebbene si tratti del limite per un singolo elemento della coda, può essere considerato proibitivo per determinati tipi di messaggi, in particolare quelli che non possono essere suddivisi in blocchi più piccoli. Dal punto di vista di uno sviluppatore, doversi preoccupare se sia possibile inserire un messaggio specifico in una coda incide sulla produttività. In fin dei conti, l'obiettivo consiste nel garantire un flusso dei dati dell'applicazione tra producer e consumer nel modo più efficiente, indipendentemente dalla dimensione dei dati. Mentre un lato chiama Put (o Enqueue) e l'altro richiama Get (o Dequeue) in una coda, in teoria le operazioni rimanenti dovrebbero verificarsi in modo automatico.

Il superamento del limite delle dimensioni dei messaggi nelle code di Windows Azure grazie a una gestione intelligente dei messaggi di grandi dimensioni è essenziale per risolvere la problematica tecnica elaborata in questo articolo, pur dovendo ricorrere ad alcuni espedienti. Nel mondo dello sviluppo di software commerciali di oggi, qualsiasi sforzo di sviluppo aggiuntivo deve essere ampiamente giustificato. In questo articolo verranno giustificati gli investimenti aggiuntivi con i seguenti obiettivi di progettazione:

  • Supporto di messaggi di grandi dimensioni eliminando eventuali restrizioni imposte dall'API del servizio di accodamento, in quanto fa riferimento alle dimensioni dei messaggi.

  • Supporto per oggetti generici definiti dall'utente quando si pubblicano e utilizzano messaggi da una coda di Windows Azure.

  • Overflow trasparente in un archivio messaggi configurabile: contenitore BLOB, cache distribuita o altro tipo di repository in grado di archiviare messaggi di grandi dimensioni.

  • Compressione trasparente atta a migliorare la redditività riducendo la quantità di spazio di archiviazione utilizzato da messaggi di grandi dimensioni.

  • Maggiore affidabilità attraverso l'utilizzo esteso delle procedure consigliate di gestione delle condizioni temporanee in fase di esecuzione di operazioni di coda.

Le basi per il supporto di messaggi di grandi dimensioni in code con vincoli nelle dimensioni verranno rappresentate dal modello indicato di seguito. In primo luogo controlleremo se è possibile inserire un determinato messaggio in una coda di Windows Azure senza eseguire ulteriori operazioni. Per determinare se un messaggio possa essere archiviato in modo sicuro in una coda senza violare i vincoli delle dimensioni, ricorreremo a una formula integrata in una funzione di supporto nel modo seguente:

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in a Windows Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in a Windows Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

Se la dimensione del messaggio è inferiore al limite imposto, sarà sufficiente richiamare l'API del servizio di accodamento per accodare il messaggio senza apportare alcuna modifica. Se la dimensione del messaggio supera la limitazione in questione, il flusso dei dati risulterà piuttosto interessante. Nel diagramma di flusso seguente vengono visualizzati i passaggi successivi:

Flusso-di-messaggi-di-archiviazione 1

In breve, se non è possibile inserire un messaggio in una coda a causa della dimensione, verrà riversato in un archivio in grado di ospitare messaggi di grandi dimensioni. Verrà quindi creato un messaggio di metadati di piccole dimensioni costituito da un riferimento all'elemento nell'archivio di overflow. Infine, il messaggio di metadati viene inserito in una coda. È consigliabile comprimere sempre un messaggio prima di considerarlo idoneo per la persistenza in una coda. In questo modo sarà possibile espandere in modo efficace il popolamento dei messaggi che possono essere inseriti in una coda senza dover ricorrere all'archivio di overflow. Un esempio calzante è costituito da un documento XML leggermente superiore a 64 KB che, in seguito alla serializzazione e alla compressione, rappresenta il candidato ideale da inserire in una coda. È possibile modificare questo comportamento qualora la compressione predefinita non sia soddisfacente. A tale scopo, fornire un componente del serializzatore personalizzato elaborato nella sezione successiva.

Sono diverse le considerazioni applicabili in questo caso, soprattutto dal punto di vista dei costi. Come si può evincere dal diagramma di flusso di cui sopra, tenteremo di determinare se un messaggio di grandi dimensioni possa innanzitutto essere riversato in Windows Azure Caching Service (di seguito indicato semplicemente come Caching Service). Poiché l'utilizzo del servizio Caching Service distribuito basato sul cloud è soggetto a un addebito, il percorso di overflow della cache deve essere reso facoltativo. Questa condizione viene riflessa nel diagramma di flusso.

Potrebbero inoltre verificarsi situazioni in cui il messaggio è piuttosto grande e pertanto non è adatto all'archiviazione in una cache distribuita con vincoli di dimensioni. Al momento della creazione di questo articolo, la dimensione massima della cache è pari a 4 GB. Alla luce di ciò, è pertanto necessario fornire un percorso di failover in caso di superamento di capacità o quote della cache. Le quote presentano inoltre un comportamento di eliminazione che è necessario tenere in considerazione.

ImportantImportante
L'utilizzo di Windows Azure Caching Service come archivio di overflow consente di ridurre la latenza ed eliminare le transazioni di archiviazione in eccesso durante lo scambio di un numero elevato di messaggi. Offre un'infrastruttura di memorizzazione nella cache distribuita e a elevata disponibilità in grado di replicare e gestire in memoria i dati memorizzati nella cache in più server di cache ai fini di durabilità. Questi vantaggi possono essere vanificati dalla limitazione nelle dimensioni della cache e dai costi associati all'utilizzo del servizio. È pertanto importante eseguire un'analisi del rapporto tra costi e benefici per valutare i pro e i contro relativi all'introduzione di Caching Service come archivio di overflow in determinati scenari.

Poiché l'archiviazione nella cache distribuita presenta limitazioni, è essenziale stabilire ulteriori regole che garantiranno un utilizzo efficace della cache. A questo proposito è necessario sottolineare un importante suggerimento:

ImportantImportante
A causa delle specifiche del comportamento di eliminazione, Caching Service non offre una durabilità garantita completa e definitiva rispetto al servizio BLOB di Windows Azure. Se utilizzato come archivio di overflow, Caching Service è particolarmente indicato per singoli messaggi volatili e di dimensioni inferiori a 8 MB. Si definiscono "volatili" i messaggi che vengono pubblicati e quindi utilizzati nel minor tempo possibile. L'indicazione di 8 MB è dovuta alla dimensione ottimale degli elementi nella cache configurata per impostazione predefinita in Caching Service.

L'indicazione di cui sopra verrà riflessa nel codice fornendo una funzione di supporto che determinerà se il valore delle dimensioni degli elementi specificato possa essere considerato ottimale per l'archiviazione di un elemento della dimensione specificata nella cache.

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

Una volta considerati alcuni prerequisiti iniziali, passiamo al lato consumer ed esaminiamo il modello di implementazione per il recupero di messaggi di grandi dimensioni da una coda. Osserviamo innanzitutto il flusso di processi al fine di agevolare la comprensione generale:

Overflow-messaggio-di-grandi-dimensioni

Per riepilogare il flusso di cui sopra, da una coda viene recuperato un messaggio di tipo sconosciuto e confrontato con un tipo di messaggio di metadati. Se non si tratta di un messaggio di metadati, il flusso procede con la logica di decompressione, in modo che il messaggio originale possa essere ricostruito prima di essere visualizzato dall'utente. Al contrario, se si tratta di un messaggio di metadati, viene controllato per determinare il tipo di archivio di overflow utilizzato per l'archiviazione del messaggio effettivo. Se viene identificato come messaggio archiviato nella cache, viene richiamata la rispettiva API Caching Service e il messaggio reale verrà recuperato prima di essere decompresso e restituito al consumer. Nel caso in cui il messaggio reale venga inserito in un contenitore BLOB, l'API del servizio BLOB dovrà recuperare il messaggio reale dall'entità BLOB, verrà decompressa e quindi restituita al chiamante.

Oltre a gestire operazioni Enqueue e Dequeue per messaggi di grandi dimensioni, è necessario garantire che tutti i payload di overflow vengano rimossi dai rispettivi archivi di messaggi di overflow su richiesta del consumer. A tale scopo, uno dei potenziali modelli di implementazione consiste nell'associare il processo di rimozione con l'operazione Delete quando viene richiamata per un determinato messaggio. La rappresentazione visiva di questa operazione può essere illustrata come segue:

Overflow-messaggio-di-grandi-dimensioni

Prima di iniziare a implementare i modelli sopra citati, è opportuno esaminare un'ultima considerazione: la definizione di un messaggio. Che cosa viene considerato un messaggio e in quale forma si manifesta? Una matrice di byte, un flusso di dati, un tipo semplice come una stringa oppure un oggetto complesso specifico di un'applicazione implementato dallo sviluppatore nell'ambito del modello a oggetti della soluzione? Non è il caso di vincolarsi in quest'area. Presupponiamo semplicemente che il messaggio sia del tipo generico <T>, a indicare che si tratta di un elemento che lo sviluppatore desidera utilizzare. Risulterà evidente che l'implementazione finale si realizzerà in modo naturale attorno a questa idea.

Riassumendo, nel diagramma seguente vengono riepilogati i tre possibili percorsi presi in considerazione nella progettazione in alto:

Percorsi-di-trasmissione-del-messaggio

A questo punto avremo elementi a sufficienza per iniziare a realizzare la progettazione tecnica. Da questo punto in poi verrà analizzato il codice sorgente necessario per implementare i modelli citati in precedenza.

Implementazione tecnica

Per proseguire, scaricare il codice di esempio completo da MSDN Code Gallery. L'esempio viene reso disponibile nell'ambito di un'implementazione di riferimento end-to-end più estesa basata sui modelli descritti in questo articolo. Una volta scaricato e decompresso il codice, passare al progetto Azure.Services.Framework in Contoso.Cloud.Integration ed espandere la cartella Storage. Questo percorso contiene tutti gli elementi principali del codice illustrati di seguito.

Come indicato all'inizio, l'idea originale era quella di astrarre il metodo di interazione tra un'applicazione cloud e le code di Windows Azure. In merito a questo requisito verrà fornito un contratto che regola le operazioni principali supportate dal livello di astrazione dell'archiviazione personalizzato. Di seguito è indicata l'interfaccia di programmazione tramite cui il contratto viene visualizzato nei consumer. Dal frammento di codice in alto sono state volutamente omesse alcune funzioni a livello di infrastruttura, tra cui la creazione e l'eliminazione di code, dal momento che in questa fase non apportano alcun contributo significativo.

/// <summary>
/// Defines a generics-aware abstraction layer for Windows Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

È inoltre disponibile un contratto aggiuntivo (interfaccia) che astrarrà l'accesso all'archivio di overflow di messaggi di grandi dimensioni. Due componenti implementano il contratto, uno per ogni archivio di overflow (Archiviazione BLOB e cache distribuita). Il contratto è costituito dalle operazioni seguenti:

/// <summary>
/// Defines a generics-aware abstraction layer for Windows Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

Entrambi i contratti si basano in modo significativo sul tipo generico <T>. Consente di personalizzare il tipo del messaggio in un tipo .NET qualsiasi desiderato. Sarà tuttavia necessario ricorrere ad alcuni casi di utilizzo estremi, ovvero tipi che richiedono un trattamento speciale, come i flussi. Verranno fornite maggiori informazioni più avanti.

Indipendentemente dal tipo di messaggio scelto, verrà applicato un requisito importante: il tipo di oggetto che rappresenta un messaggio in una coda deve essere serializzabile. Tutti gli oggetti che attraversano il livello di astrazione dell'archiviazione sono soggetti a serializzazione prima di essere posizionati in una coda o un archivio di overflow. In questa implementazione, anche la serializzazione e la deserializzazione vengono associate rispettivamente alla compressione e alla decompressione. Questo approccio aumenta l'efficienza dal punto di vista di costi e larghezza di banda. Il vantaggio correlato ai costi deriva dal fatto che i messaggi di grandi dimensioni compressi utilizzano in modo implicito meno spazio di archiviazione, con una conseguente riduzione dei costi di archiviazione. L'efficienza della larghezza di banda si manifesta in un risparmio sulla dimensione di payload grazie alla compressione, che a sua volta riduce i payload in transito man mano che confluiscono e defluiscono dallo spazio di archiviazione di Windows Azure.

Il requisito per la serializzazione e la deserializzazione viene dichiarato in un'interfaccia specifica. Qualsiasi componente che implementa questa interfaccia deve fornire le funzionalità specifiche di compressione, serializzazione, deserializzazione e decompressione. Di seguito viene indicato un esempio di questa interfaccia.

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

Per la compressione e la decompressione viene utilizzato il componente DeflateStream in .NET Framework. Questa classe rappresenta l'algoritmo Deflate, un algoritmo standard di settore conforme a RFC per la compressione e la decompressione di file senza perdita di dati. Rispetto alla classe GZipStream, produce immagini compresse ottimali e in generale garantisce prestazioni di livello superiore. Al contrario, la classe GZipStream utilizza il formato dati GZIP, che include un valore del controllo di ridondanza ciclico (CRC) per rilevare il danneggiamento dei dati. Dietro le quinte, il formato dati GZIP utilizza lo stesso algoritmo di compressione della classe DeflateStream. In breve, GZipStream = DeflateStream + il costo di calcolo e dell'archiviazione dei checksum CRC.

Di seguito è indicata l'implementazione del contratto di questo esempio. È possibile attivare e disattivare agevolmente gli algoritmi di compressione sostituendo la classe DeflateStream con GZipStream e viceversa.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

Una funzionalità particolarmente efficace nell'implementazione CloudStorageEntitySerializer è rappresentata dalla capacità di applicare un trattamento speciale in fase di gestione di documenti XML di entrambi i tipi: XmlDocument e XDocument. Un'altra area degna di nota è costituita dalla serializzazione e deserializzazione ottimale dei dati XML. In questo esempio verranno utilizzate le classi XmlDictionaryReader e XmlDictionaryWriter, indicate dagli sviluppatori .NET come scelte ideali nell'ambito di un'efficace esecuzione della serializzazione e della deserializzazione di payload XML tramite il formato XML binario .NET

La decisione relativa al tipo di archivio di messaggi di overflow ricade sul consumer, che coinvolge il livello di astrazione dell'archiviazione personalizzato. Su questa falsariga verrà fornita un'opzione per selezionare il tipo di archivio di messaggi desiderato aggiungendo i seguenti costruttori nel tipo che implementa l'interfaccia ICloudQueueStorage:

/// <summary>
/// Provides reliable generics-aware access to the Windows Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

I costruttori precedenti non eseguono operazioni complesse, ma si limitano a inizializzare i membri interni e configurare il componente client che accederà a una coda di Windows Azure. È tuttavia necessario indicare in modo esplicito al client della coda di non applicare alcun criterio di tentativo. Per poter fornire un livello di astrazione dell'archiviazione efficace e affidabile, sarà necessario disporre di un livello di controllo più granulare sui problemi temporanei durante l'esecuzione di operazioni nelle code di Windows Azure. Verrà pertanto impiegato un componente separato per il riconoscimento e la gestione di una gamma più ampia di errori intermittenti.

Esaminiamo ora gli elementi interni della classe ReliableCloudQueueStorage precedentemente preparata. Nello specifico, analizziamo l'implementazione dell'operazione Put, poiché si tratta della posizione di overflow trasparente in un archivio di messaggi di grandi dimensioni.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

Un nuovo elemento di codice già presente nel frammento indicato in alto è la classe LargeQueueMessageInfo. Questo tipo personalizzato rappresenta il messaggio di metadati che descrive la posizione di un messaggio di grandi dimensioni. Questa classe è contrassegnata come interna, in quanto non verrà resa visibile ai membri esterni all'implementazione del livello di astrazione dell'archiviazione. La classe viene definita nel modo seguente:

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Windows Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Windows Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

Procedendo, sarà necessario implementare un archivio di overflow di messaggi di grandi dimensioni che utilizzerà il servizio di Archiviazione BLOB di Windows Azure. Come già indicato, questo componente deve supportare l'interfaccia ICloudBlobStorage che verrà utilizzata dal componente ReliableCloudQueueStorage per inoltrare i messaggi nell'implementazione ICloudBlobStorage ogni volta che non potranno essere inseriti in una coda per via delle limitazioni in merito alle dimensioni. Per predisporre i passaggi successivi, verrà inclusa esclusivamente l'implementazione del costruttore:

/// <summary>
/// Implements reliable generics-aware layer for Windows Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Windows Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

Precedentemente in questo articolo è stata illustrata l'implementazione dell'operazione Put, in base alla quale i messaggi di dimensioni ridotte verranno sempre inseriti in una coda, mentre quelli di grandi dimensioni verranno instradati in modo trasparente nell'archivio di overflow. Per garantire continuità, verranno ora esaminati i meccanismi alla base dell'operazione Put equivalente implementata dall'archivio di overflow.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

In breve, il codice precedente, in cui viene utilizzato un BLOB di tipo <T>, controlla in primo luogo se quest'ultimo è già un'immagine serializzata di un messaggio sotto forma di oggetto Stream. Tutti i messaggi di grandi dimensioni inoltrati all'archiviazione di overflow dal componente ReliableCloudQueueStorage giungeranno come flussi predisposti per la persistenza. Verrà quindi richiamata l'azione UploadBlob, che chiama a sua volta l'API client del servizio BLOB, in particolare l'operazione UploadFromStream. Se non è possibile caricare un BLOB di un messaggio di grandi dimensioni, il codice analizza l'errore restituito dal servizio BLOB e fornisce un percorso di failover per 2 condizioni: ContainerNotFound e BlobAlreadyExists. Se il contenitore del BLOB di destinazione non viene trovato, il codice tenterà di creare il contenitore mancante. Questa operazione verrà eseguita in un ambito basato su tentativi al fine di migliorare l'affidabilità e la resilienza agli errori temporanei. Il secondo percorso di failover consentirà di gestire una situazione in cui è già presente un BLOB con lo stesso nome. Il codice rimuoverà il BLOB esistente, a condizione che sia abilitato il comportamento di sovrascrittura. In seguito alla rimozione, verrà effettuato un ulteriore tentativo di caricamento del nuovo BLOB. Ancora una volta questa operazione viene eseguita in un ambito basato su tentativi per migliorare il livello di affidabilità.

Ora che è possibile archiviare messaggi di grandi dimensioni in un contenitore BLOB, passiamo alla progettazione di un'altra implementazione dell'interfaccia ICloudBlobStorage che utilizzerà Windows Azure Caching Service. Per coerenza, inizieremo dai costruttori:

/// <summary>
/// Implements reliable generics-aware layer for Windows Azure Caching Service.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Windows Azure Caching Service.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

In base alle considerazioni precedenti, una delle decisioni di progettazione tecnica più importanti consiste nella possibilità di usufruire del servizio BLOB e di Caching Service per l'archiviazione di messaggi di grandi dimensioni. L'opzione di memorizzazione nella cache risulta particolarmente appropriata per gli oggetti temporanei che non superano le dimensioni di payload consigliate di 8 MB. L'opzione BLOB viene essenzialmente utilizzata per tutti gli altri casi. In generale, questa decisione introduce la necessità di un'archiviazione di overflow ibrida. La base per compilare un archivio ibrido è già presente nella codebase. Si tratta semplicemente di combinare gli elementi esistenti nel modo seguente:

/// <summary>
/// Implements reliable generics-aware storage layer combining Windows Azure Blob storage and
/// Windows Azure Caching Service in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Windows Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Windows Azure Caching Service.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

A questo punto, per concludere verrà introdotto un ulteriore frammento di codice in cui viene illustrata l'implementazione dell'operazione Put nell'archivio di overflow ibrido.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

Questo articolo risulterebbe incompleto se non venissero forniti alcuni esempi in merito al modo in cui un'applicazione client può utilizzare il livello di astrazione dell'archiviazione sopra citato. Questi esempi verranno combinati con un'applicazione di test che convaliderà l'implementazione tecnica.

Convalida

Per poter garantire che messaggi di grandi dimensioni possano passare senza alcun problema attraverso il livello di astrazione dell'archiviazione appena implementato, è stata assemblata una semplice applicazione console. Nel primo passaggio viene utilizzato e inserito in una coda di Windows Azure un documento XML di esempio di 90 MB. Nel secondo passaggio viene utilizzato un messaggio dalla coda. Il messaggio dovrà essere il documento XML originale scritto nuovamente sul disco con un nome diverso, per poter confrontare le dimensioni del file e il relativo contenuto. Tra questi passaggi, l'applicazione passerà in modalità di sospensione, durante la quale sarà possibile visualizzare il contenuto di una coda e il relativo archivio di overflow del messaggio, ad esempio il contenitore BLOB o della cache. Di seguito viene fornito il codice sorgente dell'applicazione di test.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on a Windows Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

Per completare, di seguito verrà indicato il file di configurazione dell'applicazione utilizzato in fase di test. Se si intende provare l'applicazione di test, accertarsi di modificare la copia di app.config e aggiungere le effettive credenziali dell'account di archiviazione e le informazioni sull'endpoint di Caching Service.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

Se l'applicazione di test è stata compilata ed eseguita in modo corretto, nelle finestre della console dovrebbe essere visualizzato un output simile a quello indicato di seguito.

Output-finestra-console

Osservando l'account di archiviazione utilizzato dall'applicazione di test, nella coda verrà visualizzato il messaggio seguente:

Messaggio-metadati-serializzati

Poiché il messaggio di test è di dimensioni sufficientemente elevate per eseguire l'overflow diretto nell'archiviazione BLOB, nella schermata seguente viene illustrato il contenuto previsto nel rispettivo contenitore BLOB mentre l'applicazione di test viene sospesa:

Contenitore-BLOB

Come si può notare, il documento XML originale di 90 MB utilizzato nel test è ora un BLOB di 11 MB. Questa condizione riflette un risparmio pari all'87% sullo spazio di archiviazione e sulla larghezza di banda, grazie all'applicazione della serializzazione binaria XML. Per via della classe di destinazione degli scenari, la serializzazione binaria XML abbinata alla compressione rappresenta la scelta più idonea.

Una volta che l'applicazione di test passerà all'eliminazione del messaggio della coda, è prevista la rimozione del messaggio di metadati insieme al BLOB contenente i dati di messaggio, come mostrato nella schermata seguente:

BLOB-rimosso

L'esempio illustrato in precedenza riflette una visione semplicistica del ciclo di vita di un messaggio di grandi dimensioni. È stato realizzato con l'obiettivo di evidenziare le nozioni fondamentali sul livello di astrazione dell'archiviazione, tra cui l'instradamento di messaggi di grandi dimensioni nell'archivio BLOB, la compressione trasparente, la rimozione automatica di entrambe le parti del messaggio. È ora giunto il momento di trarre una conclusione.

Conclusione

Come abbiamo potuto vedere, l'utilizzo delle code di Windows Azure può essere esteso per il supporto di messaggi di dimensioni superiori a 64 KB grazie all'impiego di Windows Azure Caching Service e del servizio BLOB di Windows Azure senza aggiungere ulteriori restrizioni tecniche nel client. È stato dimostrato che, con un piccolo contributo di lavoro in più, è possibile incrementare il livello di esperienza di messaggistica per il client garantendo miglioramenti alla qualità della vita nel modo seguente:

  • Compressione trasparente dei messaggi per ridurre i costi di archiviazione e risparmiare larghezza di banda all'interno e all'esterno del data center.

  • Overflow trasparente e facilmente personalizzabile di messaggi di grandi dimensioni nello spazio di archiviazione della cache o BLOB.

  • Supporto per generics che consente di archiviare agevolmente qualsiasi tipo di oggetto.

  • Gestione automatica delle condizioni temporanee per migliorare l'affidabilità.

Come già indicato in precedenza, mentre questa soluzione può impiegare la cache distribuita e l'archivio BLOB per l'archiviazione dell'overflow, l'utilizzo di Windows Azure Caching Service comporta costi aggiuntivi. Valutare attentamente i requisiti di archiviazione di un progetto ed eseguire un'analisi dei costi in base al numero e alle dimensioni dei messaggi previsti prima di decidere di abilitare l'overflow per l'utilizzo della cache.

Sebbene questa soluzione rappresenti uno strumento sicuro per supportare messaggi di grandi dimensioni nelle code di Windows Azure, c'è sempre spazio per i miglioramenti. Di seguito sono indicati alcuni esempi di funzionalità a valore aggiunto non incorporate nella soluzione, ma che è possibile aggiungere:

  • Capacità di configurare il tipo di archivio di overflow per messaggi di grandi dimensioni nella configurazione dell'applicazione.

  • Serializzatori personalizzati aggiuntivi, nel caso in cui il serializzatore predefinito non soddisfi gli obiettivi di prestazioni o esigenze funzionali (ad esempio non si necessita della compressione predefinita).

  • Un elemento nei metadati del blog che funga da controllo di navigazione consentendo di analizzare l'archivio BLOB e individuare rapidamente la presenza di BLOB di messaggi orfani di grandi dimensioni (zombie).

  • Un componente "Garbage Collector" che controlla la rimozione tempestiva di tutti i BLOB orfani dall'archivio di messaggi di overflow (nel caso in cui accedano alle code anche componenti diversi dal livello di astrazione dell'archiviazione qui implementato).

Il codice di esempio è disponibile per il download da MSDN Code Gallery. Tutti i file di codice sorgente sono regolati da Microsoft Public License, come illustrato nelle note legali corrispondenti.

Risorse e riferimenti aggiuntivi

Per ulteriori informazioni sull'argomento descritto in questo articolo, fare riferimento ai collegamenti seguenti:


Data di compilazione:

2013-10-23
Il documento è risultato utile?
(1500 caratteri rimanenti)
Grazie per i commenti inviati.

Aggiunte alla community

AGGIUNGI
Microsoft sta conducendo un sondaggio in linea per comprendere l'opinione degli utenti in merito al sito Web di MSDN. Se si sceglie di partecipare, quando si lascia il sito Web di MSDN verrà visualizzato il sondaggio in linea.

Si desidera partecipare?
Mostra:
© 2014 Microsoft. Tutti i diritti riservati.