VERTRIEB: 1-800-867-1380

Methoden zum Behandeln großer Nachrichten mit Azure-Warteschlangen

Letzte Aktualisierung: März 2015

Autor: Valery Mizonov

Bearbeiter: Brad Calder, Larry Franks, Jai Haridas, Sidney Higa, Christian Martinez, Curt Peterson und Mark Simms

Dieser Artikel dient dazu, einen auf Entwickler ausgerichteten Leitfaden zur Implementierung einer Generika-bewussten Speicherabstraktionsebene für den Azure-Warteschlangendienst anzubieten. Damit sehr große Nachrichten in Azure-Warteschlangen unterstützt werden können, müssen die aktuellen Größenbeschränkungen für Nachrichten überwunden werden. Dieser Artikel und der zugehörige Code helfen Ihnen, Azure-Warteschlangen einzusetzen, ohne sich in die Problematik der Nachrichtengröße vertiefen zu müssen, die Ihnen durch die 64-KB-Grenze der Warteschlange auferlegt wird.

Können Sie sich noch daran erinnern, als wir dachten, dass 640 K für jeden ausreichend sein sollte? Einige Kilobyte waren in der Vergangenheit durchaus eine luxuriöse Speichermenge, in der ein disziplinierter Entwickler von damals alle Anwendungsdaten unterbringen konnte. Heute kann sich die Datenmenge, die moderne Anwendungen auszutauschen müssen, erheblich unterscheiden. Egal, ob es sich um eine sehr kleine HL7-Nachricht oder um ein EDI-Dokument mit vielen Megabytes handelt: moderne Anwendungen müssen allerlei volumetrische Eigenschaften verwenden, die sich mit unvorhersehbarer Geschwindigkeit entwickeln. Ein Geschäftsobjekt, das möglicherweise im letzten Jahrhundert in einer Multibyte-Struktur angegeben wurde, ist heute als speicherhungriges Artefakt dank moderner Serialisierung sowie Darstellungstechniken und -formate um einiges größer als sein Vorgänger.

Nachrichten in einer vorgegebenen Lösungsarchitektur ohne technische Einschränkung der Nachrichtengröße zu verwalten, ist der Schlüssel, um sich ständig ändernde Datenmengen zu unterstützen. Große Nachrichten können nicht immer vermieden werden. Wenn beispielsweise eine B2B-Lösung entwickelt wurde, um EDI-Datenverkehr handhaben zu können, muss die Projektmappe vorbereitet werden, um EDI-Dokumente bis hin zu mehreren Megabytes zu empfangen. Jede Ebene, jeder Dienst und jede Komponente in aufeinander folgenden Flüssen müssen die Dokumentgröße annehmen können, die verarbeitet wird. Ein 20 MB großes EDI-846-Dokument über einen Webdienst erfolgreich anzunehmen, dies jedoch nicht in einer Warteschlange für die Verarbeitung aufgrund von Nachrichtengrößeneinschränkungen speichern zu können, wäre ein unerfreuliches Ergebnis beim Testen.

Warum würde sich ein Benutzer entscheiden, eine Warteschlange für große Nachrichten in der Azure-Plattform zu verwenden? Was ist mit Alternativen wie Blobs, Tabellen, Cloud-Laufwerken oder Microsoft Azure SQL-Datenbanken? Die Warteschlangen erlauben das Implementieren bestimmter Typen von Messagingszenarien, die von asynchroner, lose verbundener Kommunikation zwischen Producern und Consumern, die in einer skalierbaren und zuverlässigen Weise ausgeführt wird, gekennzeichnet sind. Die Verwendung von Azure-Warteschlangen entkoppelt verschiedene Teile einer angegebenen Projektmappe und bietet eine eindeutige Semantik wie First-In-First-Out (FIFO) und eine wenigstens einmalige Übermittlung (At-Least-Once) an. Solche Semantik kann etwas schwierig zu implementieren sein, wenn andere alternativen Datenaustauschmechanismen verwendet werden. Darüber hinaus sind Warteschlangen als vorübergehender Speicher für den Datenaustausch zwischen Diensten, Ebenen und Anwendungen geeignet, aber nicht als dauerhafter Datenspeicher. Die jeweilige Datenaustauschanforderung selbst kann in vielen verschiedenen Formen auftreten, wie etwa Übergeben von Nachrichten zwischen Komponenten in asynchroner Weise, Lastenausgleich oder Skalieren komplexer Serverauslastungen. Viele dieser Datenaustauschmuster können nicht einfach ohne Warteschlangen implementiert werden. Zusammengefasst haben Warteschlangen eine entscheidende Funktion. Dass die Sorge wegfällt, was in eine Warteschlange passt und was nicht, ist ein überzeugendes Argument zum Erstellen von einheitlichen warteschlangenbasierten Messaginglösungen, die Daten jeder Größe bearbeiten können.

Diese Abstraktionsebene erleichtert es, Instanzen der anwendungsspezifischen Entitäten zu veröffentlichen und zu nutzen, im Gegensatz zur Verwendung von Bytearrays oder Zeichenfolgen, die die einzigen Typen sind, die derzeit vom Warteschlangendienst-API unterstützt werden. Dazu werde ich .NET-Generika intensiv nutzen und zudem einige nützliche Funktionen wie transparente Datenstromkomprimierung und -dekomprimierung sowie einige bekannte Methoden wie die Behandlung von periodischen Fehlern verwenden, um die Fehlertoleranz von Speichervorgängen zu verbessern.

Gegenwärtig können Nachrichten, die (nach der Serialisierung und Codierung) größer als 64 KB sind, nicht in einer Azure-Warteschlange gespeichert werden. Die clientseitige API gibt eine Ausnahme zurück, wenn Sie versuchen, eine Nachricht in einer Warteschlange zu platzieren, die größer als 64 KB ist. Während des Schreiben dieses Artikels beträgt die Nachrichtengrößenbeschränkung, die durch diese Eigenschaft zurückgegeben wird, 65536.

ImportantWichtig
Nachrichten sind abhängig von der Base64-Kodierung, wenn sie in eine Warteschlange gesendet werden. Die codierten Nutzlasten sind immer größer als die Rohdaten. Die Base64-Codierung macht im Durchschnitt 25 Prozent Mehraufwand aus. Daher verhindert die Größenbeschränkung von 64 KB effektiv das Speichern von Nachrichten mit einer größeren Nutzlast als 48 KB (75 Prozent von 64 KB).

Auch wenn dies die Grenze für ein einzelnes Warteschlangenelement ist, kann es bestimmte Typen von Nachrichten ausschließen, besonders solche, die nicht in kleinere Blöcke aufgeteilt werden können. Aus Entwicklerperspektive steigert Ihnen die Sorge, ob eine bestimmte Nachricht in einer Warteschlange untergebracht werden kann, nicht Ihre Produktivität. Im Endeffekt ist das Ziel, Anwendungsdaten zwischen Producer und Consumer möglichst effizient zu übermitteln, unabhängig von der Datengröße. Wenn eine Seite Put (oder Enqueue) aufruft und die andere Seite Get (oder Dequeue) für eine Warteschlange aufruft, sollte der Rest theoretisch automatisch geschehen.

Die Nachrichtengrößeneinschränkung in Azure-Warteschlangen zu überwinden und einen intelligenten Weg zum Umgang mit großen Nachrichten zu verwenden, ist die Schlüsselvoraussetzung für die technische Herausforderung, die in diesem Artikel erarbeitet wird. Dies bringt jedoch zusätzliche Kosten mit sich. In der modernen Welt der Handelssoftwareentwicklung muss jeder zusätzliche Entwicklungsaufwand klug begründet werden. Ich werde die zusätzlichen Investitionen mithilfe der folgenden Entwurfsziele begründen:

  • Unterstützung für große Nachrichten durch Vermeiden von Einschränkungen durch den Warteschlangendienst-API, da sich dieser auf die Nachrichtengröße bezieht.

  • Unterstützung benutzerdefinierter generischer Objekte, wenn Nachrichten aus einer Azure-Warteschlange veröffentlicht und genutzt werden.

  • Transparenter Überlauf in einen konfigurierbaren Nachrichtenspeicher, entweder BLOB-Container, verteilter Cache oder anderer Repository-Typ zum Speichern großer Nachrichten.

  • Transparente Komprimierung, die die Wirtschaftlichkeit erhöhen soll, indem sie den Speicherplatz minimiert, den umfangreiche Nachrichten verbrauchen.

  • Verbesserte Zuverlässigkeit in Form von umfangreicher Verwendung der Methoden für die vorübergehende Bedingungsbehandlung, wenn Warteschlangenvorgänge ausgeführt werden.

Die Basis für die Unterstützung großer Nachrichten in größenbeschränkten Warteschlangen ist folgendes Muster: Zunächst wird überprüft, ob eine bestimmte Nachricht ohne Mehrarbeit in einer Azure-Warteschlange Platz findet. Die Bestimmung, ob eine Nachricht sicher in einer Warteschlange gespeichert werden kann, ohne gegen Größeneinschränkungen zu verstoßen, erfolgt durch eine Formel, die ich wie folgt in eine Hilfsfunktion einbinde:

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

Wenn die Nachrichtengröße unter der erzwungenen Grenze ist, sollte ich die Warteschlangendienst-API dazu bringen, die Nachricht unverändert in die Warteschlange einzureihen. Wenn die Nachrichtengröße die betreffenden Einschränkung übersteigt, wird der Datenfluss interessant. Im folgenden Flussdiagramm werden die folgenden Schritte visuell dargestellt:

Speichernachrichtenfluss1

Wenn eine Nachricht aufgrund ihrer Größe nicht in einer Warteschlange untergebracht werden kann, wird sie in einem Nachrichtenspeicher gesammelt, der das Speichern großer Nachrichten unterstützt. Eine sehr kleine Metadatennachrichten wird erstellt, die einen Verweis auf das Element im Überlaufspeicher enthält. Anschließend wird die Metadatennachricht in eine Warteschlange eingereiht. Ich komprimiere eine Nachricht immer, bevor ich ihre Eignung für den Aufenthalt in einer Warteschlange durchsetze. Dies erweitert die Anzahl der Nachrichten, die in die Warteschlange eingereiht werden können, ohne in den Überlaufspeicher geleitet werden zu müssen. Ein gutes Beispiel ist ein XML-Dokument, das etwas größer als 64 KB ist, und das nach einer Serialisierung und Komprimierung einfach in eine Warteschlange eingereiht werden kann. Wenn die standardmäßige Komprimierung nicht ausgeführt werden soll, können Sie dieses Verhalten ändern. Dazu muss eine benutzerdefinierte Serialisierungsprogramm-Komponente ausgearbeitet werden, was im nächsten Abschnitt erläutert wird.

Es gibt verschiedene Aspekte, die hier gelten, hauptsächlich bezüglich der Kosten. Wie im obigen Flussdiagramm zu sehen, versuche ich festzustellen, ob eine große Nachricht zunächst per Überlauf in den Azure-Cache geleitet werden kann. Da die Verwendung von verteilten Cloud-basierten Cachediensten kostenpflichtig ist, sollte der Sammelpfad optional gemacht werden. Dies wird im Flussdiagramm wiedergegeben.

Außerdem kann es vorkommen, dass die Nachricht relativ groß ist und daher nicht für die Speicherung in einem in der Größe eingeschränkten verteilten Cache geeignet ist. Beim Schreiben dieses Artikels liegt die maximale Cachegröße bei 4 GB. Daher müssen Sie dies in Erwägung ziehen und einen Failoverpfad bereitstellen, wenn die Kapazität oder Kontingente des Caches überschritten sind. Die Kontingente gehen mit Entfernungsverhalten einher, das ebenfalls berücksichtigt werden muss.

ImportantWichtig
Die Verwendung des Azure-Cache als Überlaufspeicher trägt dazu bei, die Wartezeit zu reduzieren und Speichertransaktionen zu entfernen, wenn übermäßig viele Nachrichten ausgetauscht werden. Er bietet eine hohe Verfügbarkeit und eine verteilte Cache-Infrastruktur, die zwischengespeicherte Daten im Arbeitsspeicher über mehrere Cacheserver dauerhaft replizieren und beibehalten kann. Diese Vorteile können durch die Cachegrößeneinschränkung und die Kosten zunichte gemacht werden, die mit der Dienstverwendung einhergehen. Es ist deshalb wichtig, eine Kosten-Nutzen-Analyse durchzuführen, um die Vor- und Nachteile von Azure-Cache als Überlaufspeicher in bestimmten Szenarien zu bewerten.

noteHinweis
Anleitungen zum Auswählen des richtigen Azure Cache-Angebots für Ihre Anwendung finden Sie unter Welches Azure-Cacheangebot eignet sich am besten für mich?

Hier findet sich eine Hilfsfunktion, die bestimmt, ob der Größenwert des angegebenen Elements als optimal angesehen wird, wenn ein Element der angegebenen Größe im Cache gespeichert wird:

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;
    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

Nachdem einige erforderliche Komponenten betrachtet wurden, ist es nun Zeit, zur Consumerseite zu wechseln und einen Blick auf das Implementierungsmuster zum Abrufen großer Nachrichten von einer Warteschlange zu werfen. Zunächst soll der Verarbeitungsfluss zur Erleichterung des Gesamtverständnisses visualisiert werden:

Überlauf für große Nachrichten

Um den obigen Fluss zusammenzufassen, wird eine Nachricht eines unbekannten Typs aus einer Warteschlange abgerufen und mit einem Metadatennachrichtentyp verglichen. Wenn es sich nicht um eine Metadatennachricht handelt, wird der Fluss mit Dekomprimierungslogik fortgesetzt, damit die ursprüngliche Nachricht richtig rekonstruiert werden kann, bevor sie dem Consumer präsentiert wird. Wenn es sich dagegen tatsächlich um eine Metadatennachricht gehandelt hat, wird sie überprüft. Dies erfolgt, um den Typ des Überlaufspeichers zu ermitteln, der zum Speichern der tatsächlichen Nachricht verwendet wurde. Wenn sie als im Cache gespeicherte Nachricht identifiziert wird, wird die jeweilige Cachedienst-API aufgerufen. Die tatsächliche Nachricht wird abgerufen, bevor sie dekomprimiert und an den Consumer zurückgegeben wird. Falls die tatsächliche Nachricht in einem BLOB-Container abgelegt wurde, wird die BLOB-Dienst-API abgefragt, um die tatsächliche Nachricht aus der BLOB-Entität abzurufen, dekomprimiert und zurückgegeben an den Aufrufer.

Neben der Durchführung der Vorgänge Enqueue und Dequeue für große Nachrichten muss sichergestellt werden, dass alle Überlauf-Nutzlasten aus ihren jeweiligen Überlauf-Nachrichtenspeichern bei Anforderung durch den Consumer entfernt werden. Eines der möglichen Implementierungsmuster ist, den Entfernungsprozess mit dem Delete-Vorgang zu verknüpfen, wenn er für eine angegebene Nachricht aufgerufen wird. Dieser Vorgang kann wie folgt visuell dargestellt werden:

Überlauf für große Nachrichten

Vor dem Implementieren der oben genannten Muster steht die Definition einer Nachricht. Was kann als Nachricht betrachtet werden, und auf welche Arten manifestiert sie sich? Als Byte-Array, Datenstrom, einfacher Typ (Zeichenfolge) oder komplexes anwendungsspezifisches Objekt, das der Entwickler als Teil des Lösungsobjektmodells implementiert? Ich glaube, dass dies ein Thema ist, mit dem Sie sich nicht zu lange beschäftigen sollten. Gehen wir einfach davon aus, dass eine Nachricht den generischen Typ <T> aufweist, d. h. den Typ, den der Entwickler verwenden möchte. Sie werden sehen, dass sich die letztendliche Implementierung automatisch danach ausrichtet.

Das folgende Diagramm fasst alle drei möglichen Übertragungspfade des obigen Designs zusammen:

Nachrichtenübertragungspfade

Nun geht es an das technische Design. Ab hier geht es hauptsächlich um den Quellcode, der erforderlich ist, um die weiter oben erläuterten Muster zu implementieren.

Um folgen zu können, laden Sie den kompletten Beispielcode aus den Code-Beispielen herunter. Das Beispiel ist Teil einer größeren umfassenden Referenzimplementierung, die sich auf die in diesem Artikel erörterten Muster stützt. Navigieren Sie nach dem Herunterladen und Entzippen zum Azure.Services.Framework-Projekt unter Contoso.Cloud.Integration, und blenden Sie den Storage-Ordner ein. Dieser Speicherort enthält alle wichtigen Codeartefakte, die nachfolgend erläutert werden.

Wie zu Beginn erwähnt, war die ursprüngliche Idee, die Interaktion einer Cloud-Anwendung mit Windows Azure-Warteschlangen zu abstrahieren. Ich erfülle diese Anforderung, indem ich einen Vertrag bereitstelle, der die von meiner benutzerdefinierten Speicherabstraktionsebene unterstützten Hauptvorgänge festlegt. Die Programmierschnittstelle, über die der Vertrag Consumern bereitgestellt wird, ist unten dargestellt. Ich habe einige Infrastrukturfunktionen (wie das Erstellen und Löschen von Warteschlangen) im Codeausschnitt unten weggelassen, da sie hier nicht relevant sind.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);
    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);
    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);
    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);
    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

Ferner muss auch ein zusätzlicher Vertrag (Schnittstelle) vorhanden sein, der den Zugriff auf den Überlaufspeicher für große Nachrichten abstrahiert. Zwei Komponenten implementieren den Vertrag, eine für jeden Überlaufspeicher (BLOB-Speicher und verteilter Cache). Der Vertrag umfasst folgende Vorgänge:

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);
    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);
    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);
    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

Beide Verträge stützen sich stark auf den generischen Typ <T>. Er ermöglicht es Ihnen, den Nachrichtentyp in einen .NET-Typ Ihrer Wahl zu ändern. Ich werde jedoch einige spezielle Anwendungsfälle zeigen, d. h. Typen, die besonders behandelt werden müssen (wie Datenströme). Darauf gehe ich später ein.

Unabhängig vom ausgewählten Nachrichtentyp, gilt eine wichtige Anforderung. Der Objekttyp, der eine Nachricht in einer Warteschlange repräsentiert, muss serialisierbar sein. Alle Objekte, die die Speicherabstraktionsebene durchlaufen, werden serialisiert, bevor sie in eine Warteschlange oder einen Überlaufspeicher gelangen. In meiner Implementierung werden Serialisierung und Deserialisierung auch mit Komprimierung und Dekomprimierung verknüpft. Dieser Ansatz erhöht die Effizienz aus Kosten- und Bandbreitenperspektive. Der Kostenvorteil entsteht dadurch, dass komprimierte große Nachrichten grundsätzlich weniger Speicher verbrauchen, was zu einer Senkung der Speicherkosten führt. Die Bandbreiteneffizienz ergibt sich aus einer kleineren Nutzlastgröße aufgrund von Komprimierung. Dies wiederum verringert die Nutzlastgröße zum und vom Azure-Speicher im Netzwerk.

Die Anforderung zur Serialisierung und Deserialisierung wird in einer speziellen Schnittstelle deklariert. Jede Komponente, die diese Schnittstelle implementiert, muss die spezielle Komprimierungs-, Serialisierungs-, Deserialisierungs- und Dekomprimierungsfunktionalität bereitstellen. Ein Beispiel für diese Schnittstelle wird hier gezeigt:

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);
    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

Für die Komprimierung und Dekomprimierung verwende ich die DeflateStream-Komponente in .NET Framework. Diese Klasse repräsentiert den Deflate-Algorithmus, einen standardmäßigen RFC-kompatiblen Industrie-Algorithmus für verlustfreie Dateikomprimierung und -dekomprimierung. Im Vergleich zur GZipStream-Klasse erzeugt erstere optimalere komprimierte Bilder und bietet im Allgemeinen eine bessere Leistung. Dagegen verwendet die GZipStream-Klasse das GZIP-Datenformat, das einen Wert für die zyklische Redundanzprüfung (CRC, Cyclic Redundancy Check) zum Erkennen von Datenbeschädigung enthält. Im Hintergrund verwendet das GZIP-Datenformat denselben Komprimierungsalgorithmus wie die DeflateStream-Klasse. Zusammengefasst: GZipStream = DeflateStream + Kosten für die Berechnung und Speicherung von CRC-Prüfsummen.

Meine Implementierung des Vertrags ist unten enthalten. Beachten Sie, dass einfach zwischen Komprimierungsalgorithmen gewechselt werden kann, indem Sie die DeflateStream-Klasse durch GZipStream und umgekehrt ersetzen.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");
        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;
        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());
            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }
    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");
        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);
            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }
    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }
    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);
            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }
    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

Eine der leistungsstarken Funktionen der CloudStorageEntitySerializer-Implementierung ist die Fähigkeit, beide Arten von XML-Dokumenten auf spezielle Weise zu behandeln: XmlDocument und XDocument. Der andere wichtige Bereich ist die optimale Serialisierung und Deserialisierung der XML-Daten. Hier habe ich die Klassen XmlDictionaryReader und XmlDictionaryWriter genutzt. .NET-Entwickler wissen, dass sie eine hervorragende Wahl sind, wenn es um die effiziente Serialisierung und Deserialisierung von XML-Nutzlasten geht, da das binäre .NET-XML-Format verwendet wird.

Die Entscheidung bezüglich des Typs des Überlaufnachrichtenspeichers liegt beim Consumer, der die benutzerdefinierte Speicherabstraktionsebene aufruft. Nach diesen Grundsätzen stelle ich eine Option bereit, um den gewünschten Nachrichtenspeichertyp auszuwählen. Dazu füge ich die folgenden Konstruktoren im Typ hinzu, der die ICloudQueueStorage-Schnittstelle implementiert:

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");
        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;
        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();
        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();
        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

Die oben genannten Konstruktoren führen keine komplexen Vorgänge aus. Sie initialisieren lediglich die internen Elemente und konfigurieren die Clientkomponente, die auf eine Azure-Warteschlange zugreift. Ich weise den Warteschlangenclient jedoch explizit an, keine Wiederholungsrichtlinie zu erzwingen. Um eine robuste und zuverlässige Speicherabstraktionsebene bereitzustellen, benötige ich bei Vorgängen in Azure-Warteschlangen mehr Kontrolle über vorübergehende Probleme. Daher ist eine separate Komponente vorhanden, die noch mehr periodische Fehler erkennt und verarbeiten kann.

Sehen wir uns nun die Besonderheiten der ReliableCloudQueueStorage-Klasse an, die ich oben erstellt habe. Insbesondere geht es um ihre Implementierung des Put-Vorgangs, da dies der Speicherort des transparenten Überlaufs in einen großen Nachrichtenspeicher ist.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");
    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));
    CloudQueueMessage queueMessage = null;
    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);
        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);
        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);
            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);
            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

Ein neues Codeartefakt im obigen Ausschnitt ist die LargeQueueMessageInfo-Klasse. Dieser benutzerdefinierte Typ ist schließlich unsere Metadatennachricht, die den Speicherort einer großen Nachricht beschreibt. Diese Klasse ist als intern markiert, da sie nur innerhalb der Implementierung der Speicherabstraktionsebene sichtbar sein soll. Die Klasse wird wie folgt definiert:

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";
    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }
    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 
    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }
    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

Ich muss einen größeren Nachrichtenüberlaufspeicher implementieren, der den Azure-BLOB-Speicherdienst nutzt. Wie bereits erwähnt, muss diese Komponente die ICloudBlobStorage-Schnittstelle unterstützen. Diese wird von der ReliableCloudQueueStorage-Komponente genutzt, um Nachrichten an die ICloudBlobStorage-Implementierung weiterzugeben, wenn diese aufgrund der Begrenzung der Nachrichtengröße nicht in einer Warteschlange verarbeitet werden können. Als Vorbereitung auf die nächsten Schritte beziehe ich nur die Konstruktorimplementierung ein:

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();
        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();
        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

Weiter oben in diesem Artikel habe ich die Implementierung des Put-Vorgangs gezeigt. Dies stellt sicher, dass kleine Nachrichten immer in einer Warteschlange platziert, während große Nachrichten transparent in den Überlaufspeicher weitergeleitet werden. Sehen wir uns nun die Mechanismen hinter der Put-Vorgangsentsprechung an, die vom Überlaufspeicher implementiert wird.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");
    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);
    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;
    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }
        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;
        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };
        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();
            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;
            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }
        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });
            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();
                return uploadAction();
            });
            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }
        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }
    return false;
}

Zusammengefasst prüft der obige Code für einen BLOB des Typs <T> zunächst, ob es sich bereits um ein serialisiertes Bild einer Nachricht in Form eines Stream-Objektes handelt. Alle großen Nachrichten, die von der ReliableCloudQueueStorage-Komponente an den Überlaufspeicher weitergegeben werden, gehen als Datenströme ein und sind bereit für Persistenz. Wenn der BLOB-Zielcontainer nicht gefunden wird, versucht der Code, den fehlenden Container zu erstellen. Er führt diese Aktion in einem wiederholungsfähigen Bereich aus, um die Zuverlässigkeit zu verbessern und die vorübergehende Ausfallsicherung zu erhöhen. Der zweite Failoverpfad ist für Situationen ausgelegt, in denen ein BLOB mit demselben Namen vorhanden ist. Der Code entfernt das vorhandene BLOB, wenn Überschreiben aktiviert ist. Nach dem Entfernen wird der Upload des neuen BLOBs wiederholt. Auch hier wird dieser Vorgang innerhalb eines wiederholungsfähigen Bereichs für mehr Zuverlässigkeit ausgeführt.

Da ich nun große Nachrichten in einem BLOB-Container speichern kann, ist es Zeit, eine andere Implementierung der ICloudBlobStorage-Schnittstelle zu entwerfen, die Azure-Cache nutzt. Aus Gründen der Konsistenz beginnen wir mit den Konstruktoren:

/// <summary>
/// Implements reliable generics-aware layer for Azure Cache.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;
    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));
        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };
        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

Eine der wichtigen technischen Designentscheidungen war, den BLOB- und Azure-Cache zum Speichern großer Nachrichten zu verwenden. Die Cacheoption ist am besten für vorübergehende Objekte geeignet, die nicht die empfohlene Nutzlastgröße von 8 MB überschreiten. Die BLOB-Option ist für alles andere geeignet. Es ist ein hybrider Überlaufspeicher erforderlich. Die grundlegenden Elemente zum Erstellen eines hybriden Speichers sind bereits in der Codebasis vorhanden. Es geht nur darum, die vorhandenen Artefakte wie folgt zu verbinden:

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Cache in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;
    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that provides access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);
        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

An diesem Punkt füge ich einen weiteren Codeausschnitt hinzu, der die Implementierung des Put-Vorgangs im hybriden Überlaufspeicher zeigt.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");
    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;
    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }
        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }
    return success;
}

Dieser Artikel wäre ohne einige Beispiele zur Nutzung der oben erörterten Speicherabstraktionsebene durch eine Clientanwendung nicht vollständig. Ich kombiniere diese Beispiele mit einer Testanwendung, die auch die technische Implementierung überprüft.

Um nachzuweisen, dass große Nachrichten erfolgreich durch die neu implementierte Speicherabstraktionsebene übergeben werden können, wurde eine sehr einfache Konsolenanwendung erstellt. Im ersten Schritt wird ein XML-Beispieldokument mit 90 MB in eine Azure-Warteschlange gestellt. Im zweiten Schritt nutzt sie eine Nachricht aus der Warteschlange. Bei der Nachricht sollte es sich um das ursprüngliche XML-Dokument handeln, das unter einem anderen Namen zurück auf den Datenträger geschrieben wird, um die Dateigröße und ihren Inhalt vergleichen zu können. Zwischen diesen Schritten wird die Anwendung angehalten. Währenddessen können Sie den Inhalt der Warteschlange und des jeweiligen Nachrichtenüberlaufspeichers (wie Cache oder BLOB-Container) untersuchen. Der Quellcode für die Testanwendung wird unten bereitgestellt.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;
using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;
namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;
            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;
            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);
            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);
            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));
            long fileSize = -1, newFileSize = -1;
            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);
                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);
                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);
                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);
                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);
                    // Save it under a new name.
                    docFromQueue.Save(newFileName);
                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);
                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);
                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }
        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }
        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }
        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }
        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

Der Vollständigkeit halber finden Sie unten die Anwendungskonfigurationsdatei, die während des Testens verwendet wurde. Wenn Sie die Testanwendung testen, stellen Sie sicher, dass Sie eine Kopie von app.config bearbeiten und die tatsächlichen Anmeldedaten für das Speicherkonto und Informationen zum Cachedienstendpunkt hinzufügen.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>
  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>
  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

Wenn die Testanwendung erfolgreich kompiliert und ausgeführt wurde, wird in den Konsolenfenstern eine Ausgabe angezeigt, die folgender ähnelt:

Ausgabe des Konsolenfensters

Wenn Sie sich das von der Testanwendung verwendete Speicherkonto ansehen, wird folgende Nachricht für die Warteschlange angezeigt:

Serialisierte Metadatennachricht

Da die Testnachricht groß genug für einen direkten Überlauf in den BLOB-Speicher war, zeigt der folgende Screenshot den erwarteten Inhalt im entsprechenden BLOB-Container, während die Testanwendung angehalten wurde:

BLOB-Container

Beachten Sie, dass das ursprüngliche XML-Dokument mit 90 MB in meinem Test zu einem BLOB mit 11 MB wurde. Dies spiegelt die Einsparungen von 87 % in den Bereichen Speicher und Bandbreite wider, die das Ergebnis der binären XML-Serialisierung sind. Unter Berücksichtigung der Zielklasse von Szenarien ist die Kombination aus binärer XML-Serialisierung und Komprimierung die erste und beste Wahl.

Sobald die Testanwendung mit dem Löschen der Warteschlangennachricht fortfährt, wird die Metadatennachricht zusammen mit dem BLOB entfernt, das die Nachrichtendaten enthält (siehe Screenshot unten):

BLOB entfernt

Das oben gezeigte Beispiel spiegelt eine vereinfachte Ansicht des Lebenszyklus einer großen Nachricht wider. Es dient dazu, die Grundlagen der Speicherabstraktionsebene hervorzuheben, wie das Routing großer Nachrichten in den BLOB-Speicher, transparente Komprimierung und automatisches Entfernen beider Nachrichtenteile. Dies ist nun der richtige Zeitpunkt für eine Zusammenfassung.

Wir haben gesehen, dass Azure-Warteschlangen Nachrichten unterstützen, die größer als 64 KB sind. Dazu werden Azure-Cache und der Azure-BLOB-Dienst genutzt. Es gibt keine zusätzlichen technischen Einschränkungen für den Client. Ich habe verdeutlicht, dass Sie mit einem geringen Zusatzaufwand den Umgang mit Nachrichten für den Client optimieren können, indem Sie Lebenszyklusverbesserungen bereitstellen wie:

  • Transparente Nachrichtenkomprimierung, um die Speicherkosten und Bandbreite zum/vom Rechenzentrum zu verringern.

  • Transparenter, einfach vom Benutzer anpassbarer Überlauf großer Nachrichten in Cache oder BLOB-Speicher.

  • Generika-Unterstützung zum einfachen Speichern jedes Objekttyps.

  • Automatische Verarbeitung von Übergangszuständen für mehr Zuverlässigkeit.

Wie weiter oben erwähnt, kann für diese Lösung ein verteilter Cache und ein BLOB-Speicher als Überlaufspeicher verwendet werden. Azure-Cache ist allerdings kostenpflichtig. Sie sollten die Speicheranforderungen Ihres Projekts sorgfältig evaluieren und eine Kostenanalyse basierend auf der prognostizierten Anzahl von Nachrichten und der Nachrichtengröße durchführen, bevor Sie einen Überlauf in einen Cache aktivieren.

Diese Lösung ermöglicht die einfache Unterstützung großer Nachrichten in Azure-Warteschlangen. Verbesserungen sind jedoch immer möglich. Einige Beispiele für nützliche Funktionen, die nicht in diese Lösung integriert sind und die Sie gegebenenfalls hinzufügen möchten, sind:

  • Die Fähigkeit, den Typ des Überlaufspeichers für große Nachrichten in der Anwendungskonfiguration zu konfigurieren.

  • Die zusätzlichen benutzerdefinierten Serialisierungsprogramme, falls das standardmäßige Programm nicht Ihre Leistungsziele oder funktionalen Anforderungen erfüllt (beispielsweise, wenn Sie nicht die standardmäßige Komprimierung benötigen).

  • Ein Element in den Metadaten des BLOBs, das als Breadcrumb agiert, sodass Sie den BLOB-Speicher durchsuchen und schnell herausfinden können, ob verwaiste große Nachrichten-BLOBs (Zombies) vorhanden sind.

  • Eine "Garbage Collector"-Komponente, die das fristgerechte Entfernen aller verwaisten BLOBs aus dem Überlaufnachrichtenspeicher sicherstellt (falls auf Warteschlangen auch von anderen Komponenten als der Speicherabstraktionsebene zugegriffen wird, die hier implementiert ist).

Der zugehörige Beispielcode kann von der MSDN Code Gallery heruntergeladen werden. Beachten Sie, dass alle Quellcodedateien der Microsoft Public License unterliegen, wie in den entsprechenden rechtlichen Hinweisen erläutert wird.

Weitere Informationen zu dem in diesem Artikel erläuterten Thema finden Sie in den folgenden Ressourcen:

Fanden Sie dies hilfreich?
(1500 verbleibende Zeichen)
Vielen Dank für Ihr Feedback.
Anzeigen:
© 2015 Microsoft