Exportera (0) Skriv ut
Visa allt
EN
Det här innehållet finns inte tillgängligt på ditt språk men här finns den engelska versionen,

Best Practices for Handling Large Messages with Azure Queues

Updated: April 29, 2014

Author: Valery Mizonov

Reviewers: Brad Calder, Larry Franks, Jai Haridas, Sidney Higa, Christian Martinez, Curt Peterson, and Mark Simms

This article is intended to offer a developer-oriented guidance on the implementation of a generics-aware storage abstraction layer for the Azure Queue Service. The problem space addressed by the article is centered on supporting very large messages in Azure queues and overcoming the message size limitation that exists today. Simply put, this blog and the associated code will enable you to utilize Azure Queues without having to engage in the message size bookkeeping imposed by the queue’s 64KB limit.

Why Large Messages?

There were wonderful times when "640K ought to be enough for anybody." A few kilobytes could buy a luxurious storage space where a disciplined developer from the past was able to happily put all of her application’s data. Today, the amount of data that modern applications need to be able to exchange can vary substantially. Whether it’s a tiny HL7 message or multi-megabyte EDI document, modern applications have to deal with all sorts of volumetric characteristics evolving with unpredictable velocity. A business object that was expressed in a multi-byte structure in the last century may easily present itself today as a storage-hungry artifact several times larger than its predecessor thanks to modern serialization and representation techniques and formats.

Handling messages in a given solution architecture without imposing technical limitations on message size is the key to supporting ever-evolving data volumes. Large messages cannot always be avoided. For instance, if a B2B solution is designed to handle EDI traffic, the solution needs to be prepared to receive the EDI documents up to several megabytes. Every tier, service and component in the end-to-end flow needs to accommodate the document size that is being processed. Successfully accepting a 20MB EDI 846 Inventory Advice document via a Web Service but failing to store it in a queue for processing due to the queue’s message size constraints would be considered as unpleasant discovery during testing.

Why would someone choose to use a queue for large messages on the Azure platform? What’s wrong with other alternatives such as blobs, tables, cloud drives or Microsoft Azure SQL Databases to say the least? Mind you, the queues allow implementing certain types of messaging scenarios that are characterized by asynchronous, loosely-coupled communications between producers and consumers performed in a scalable and reliable fashion. The use of Azure queues decouples different parts of a given solution and offers unique semantics such as FIFO (First In, First Out) and At-Least-Once delivery. Such semantics can be somewhat difficult to implement using the other alternative data exchange mechanisms. Furthermore, the queues are best suited as a volatile store for exchanging data between services, tiers and applications, not as persistent data storage. The respective data exchange requirement can manifest itself in many different forms such as passing messages between components in asynchronous manner, load leveling, or scaling out complex compute workloads. Many of these data exchange patterns are not something that can be straightforward to implement without queues. In summary, the queues are a crucial capability. Not having to worry about what can and cannot go into a queue is a strong argument for building unified queue-based messaging solutions that can handle any data of any size.

In this article, I’m going to implement a solution that will enable to use an Azure queue for exchanging large messages. I also intend to simplify the way my solution interacts with Azure queues by providing an abstraction layer built on top of APIs found in the StorageClient namespace. This abstraction layer will make it easier to publish and consume instances of the application-specific entities as opposed to having to deal with byte arrays or strings which are the only types supported by the Queue Service API today. I am going to make extensive use of .NET generics, will take advantage of some value-add capabilities such as transparent stream compression and decompression as well as apply some known best practices such as handling intermittent faults in order to improve fault-tolerance of storage operations.

Design Considerations

As things stand today, a message that is larger than 64KB (after it’s serialized and encoded) cannot be stored in an Azure queue. The client-side API will return an exception if you attempt to place a message larger than 64KB in a queue. The maximum allowed message size can be determined by inspecting the MaxMessageSize property from the CloudQueueMessage class. As of the writing of this post, the message size limit returned by this property is 65536.

ImportantImportant
The maximum message size defined in CloudQueueMessage.MaxMessageSize property is not reflective of the maximum allowed payload size. Messages are subject to Base64 encoding when they are transmitted to a queue. The encoded payloads are always larger than their raw data. The Base64 encoding adds 25% overhead on average. As a result, the 64KB size limit effectively prohibits from storing any messages with payload larger than 48KB (75% of 64KB).

Even though it’s the limit for a single queue item, it can be deemed prohibitive for certain types of messages, especially those that cannot be broken down into smaller chunks. From a developer perspective, worrying about whether a given message can be accommodated on a queue doesn’t help my productivity. At the end of the day, the goal is to get my application data to flow between producers and consumers in the most efficient way, regardless of the data size. While one side calls Put (or Enqueue) and the other side invokes Get (or Dequeue) against a queue, the rest should theoretically occur automagically.

Overcoming the message size limitation in Azure queues by employing a smart way of dealing with large messages is the key premises for the technical challenge elaborated in this article. This will come at the cost of some additional craftsmanship. In the modern world of commercial software development, any extra development efforts need to be wisely justified. I am going to justify the additional investments with the following design goals:

  • Support for very large messages through eliminating any restrictions imposed by the Queue Service API as it pertains to the message size.

  • Support for user-defined generic objects when publishing and consuming messages from an Azure queue.

  • Transparent overflow into a configurable message store either blob container, distributed cache or other type of repository capable of storing large messages.

  • Transparent compression that is intended to increase cost-efficiency by minimizing the amount of storage space consumed by large messages.

  • Increased reliability in the form of extensive use of the transient condition handling best practices when performing queue operations.

The foundation for supporting large messages in size-constrained queues will be the following pattern. First, I check if a given message can be accommodated on an Azure queue without performing any extra work. The way to determine whether a message can be safely stored on a queue without violating size constraints will be through a formula which I wrap into a helper function as follows:

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

If message size is under the enforced limit, I should simply invoke the Queue Service API to enqueue the message "as is." If the message size is in excess of the limitation in question, the data flow becomes quite interesting. The following flowchart visualizes the subsequent steps:

Store-Message-Flow1

In summary, if a message cannot be accommodated on a queue due to its size, it overflows into a message store capable of storing large messages. A tiny metadata message is then created consisting of a reference to the item in the overflow store. Finally, the metadata message is put on a queue. I always choose to compress a message before asserting its suitability for persistence in a queue. This effectively expands the population of messages that can be queued without incurring the need to go into the overflow store. A good example is an XML document slightly larger than 64KB which, after serialization and compression is performed, becomes a perfect candidate to be simply put on a queue. You can modify this behavior in case the default compression is not desirable. It can be achieved by providing a custom serializer component elaborated in the next section.

There are several considerations that apply here, mainly from a cost perspective. As it can be noted in the above flowchart, I attempt to determine whether a large message can first overflow into Azure Caching Service (referred herein as Caching Service for the sake of brevity). Since the usage of distributed cloud-based caching service is subject to a charge, the cache overflow path should be made optional. This is reflected on the flowchart.

In addition, there may be situations when the message is quite large and therefore is not suitable for being stored in a size-constrained distributed cache. As of the writing of this article, the maximum cache size is 4GB. Therefore, we must take this into consideration and provide a failover path should we exceed cache capacityor quotas. The quotas come with eviction behavior that also needs to be accounted for.

ImportantImportant
The use of the Azure Caching ServHFice as an overflow store helps reduce latency and eliminate excessive storage transactions when exchanging a large number of messages. It offers a highly available, distributed caching infrastructure capable of replicating and maintaining cached data in memory across multiple cache servers for durability. These benefits can be outweighed by the cache size limitation and costs associated with the service usage. It is therefore important to perform a cost-benefit analysis to assess the pros and cons of introducing the Caching Service as an overflow store in certain scenarios.

Given that the distributed cache storage is limited, it is essential to set out some further rules that will enable the efficient use of the cache. In connection to this, one important recommendation needs to be explicitly called out:

ImportantImportant
Due to specifics of its eviction behavior, Caching Service does not offer a complete and ultimate guaranteed durability when compared to the Azure Blob Service. When used as an overflow store, Caching Service is best suited when individual messages are volatile in nature and are under 8MB in size. The term “volatile” means that messages are published into and subsequently consumed as quickly as possible. The 8MB recommendation is due to the optimal cache item size that is configured in the Caching Service by default.

I’m going to reflect the above recommendation in the code by providing a helper function that will determine whether or not the specified item size value can be considered as optimal when storing an item of the given size in the cache.

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

Now that some initial pre-requisites are considered, it’s time to switch over to the consumer side and take a look at the implementation pattern for retrieving large messages from a queue. First, let’s visualize the process flow for the purposes of facilitating overall understanding:

Large-Message-Overflow

To summarize the above flow, a message of an unknown type is fetched from a queue and compared against a metadata message type. If it is not a metadata message, the flow continues with decompression logic, so that the original message can be correctly reconstructed before being presented to the consumer. By contrast, if it was in fact a metadata message, it is inspected to determine the type of overflow store that was used for storing the actual message. If it is identified as a message stored in the cache, the respective Caching Service API is invoked and the real message will be fetched before being decompressed and returned to the consumer. In case the real message was put into a blob container, the Blob Service API will be targeted to retrieve the real message from the blob entity, decompressed and handed back to the caller.

In addition to handling Enqueue and Dequeue operations for large messages, there is a need to make sure that all overflowed payloads are removed from their respective overflow message stores upon the consumer’s request. To accomplish this, one of the potential implementation patterns is to couple the removal process with the Delete operation when it’s being invoked for a given message. The visual representation of this operation can be depicted as follows:

Large-Message-Overflow

Before we start implementing the patterns mentioned above, one last consideration worth making is the definition of a message. What would be considered a message, and what forms will it manifest itself in? Would it be a byte array, a stream of data, a simple type like a string, or a complex application-specific object which the developer implements as part of the solution object model? I truly believe that this is the area where we should not constrain ourselves. Let’s just assume that a message is of generic type <T> meaning it’s anything the developer wishes to use. You will see that the end implementation will naturally unfold itself around this idea.

Putting all together, the following diagram summarizes all the three possible travel paths which are accounted for in the above design:

Message-Travel-Paths

At this point, there seem to be enough input to start bringing the technical design to life. From this point onwards, I will switch the focus to the source code required to implement the patterns discussed above.

Technical Implementation

To follow along, download the full sample code from the MSDN Code Gallery. The sample is shipped as part of a larger end-to-end reference implementation which is powered by the patterns discussed in this article. Once downloaded and unzipped, navigate to the Azure.Services.Framework project under Contoso.Cloud.Integration and expand the Storage folder. This location contains all the main code artifacts discussed below.

As noted at the beginning, the original idea was to abstract the way a cloud application interacts with Window Azure queues. I approach this requirement by providing a contract that governs the main operations supported by my custom storage abstraction layer. The programming interface through which the contract surfaces to consumers is shown below. I intentionally omitted a few infrastructure-level functions from the code snippet below such as creation and deletion of queues since these do not add significant value at this time.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

There is also a need for one extra contract (interface) which will abstract access to the large-message overflow store. Two components implement the contract, one for each overflow store (blob storage and distributed cache). The contract is comprised of the following operations:

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

Both contracts heavily rely on generic type <T>. It enables you to tailor the message type to any .NET type of your choice. I will however have to handle some extreme use cases, namely types that require special treatment, such as streams. I expand on this later on.

Regardless of the message type chosen, one important requirement will apply; the object type that represents a message on a queue must be serializable. All objects passing through the storage abstraction layer are subject to serialization before they land on a queue or overflow store. In my implementation, serialization and deserialization are also coupled with compression and decompression, respectively. This approach increases efficiency from a cost and bandwidth perspective. The cost-related benefit comes from the fact that compressed large messages inherently consume less storage, resulting in a decrease in storage costs. The bandwidth efficiency arises from savings on payload size thanks to compression, which in turn makes payloads smaller on the wire as they flow to and from the Azure storage.

The requirement for serialization and deserialization is declared in a specialized interface. Any component that implements this interface must provide the specific compression, serialization, deserialization, and decompression functionality. An example of this interface is shown:

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

For compression and decompression, I use the DeflateStream component in the .NET Framework. This class represents the Deflate algorithm, an industry standard RFC-compliant algorithm for lossless file compression and decompression. In comparison to the GZipStream class, the former produces more optimal compressed images and generally delivers better performance. By contrast, the GZipStream class uses the GZIP data format, which includes a cyclic redundancy check (CRC) value for detecting data corruption. Behind the scenes, the GZIP data format uses the same compression algorithm as the DeflateStream class. In summary, GZipStream = DeflateStream + the cost of calculating and storing CRC checksums.

My implementation of the contract is included below. Note that compression algorithms can be easily toggled by replacing DeflateStream class with GZipStream and vice versa.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

One of the powerful capabilities in the CloudStorageEntitySerializer implementation is the ability to apply special treatment when handling XML documents of both flavors: XmlDocument and XDocument. The other area worth highlighting is the optimal serialization and deserialization of the XML data. Here I decided to take advantage of the XmlDictionaryReader and XmlDictionaryWriter classes which are known to .NET developers as a superb choice when it comes to performing efficient serialization and deserialization of XML payloads by using the .NET Binary XML format.

The decision as to the type of overflow message store is the responsibility of the consumer which calls into the custom, storage abstraction layer. Along these lines, I’m going to provide an option to select the desired message store type by adding the following constructors in the type that implements the ICloudQueueStorage interface:

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

The above constructors are not performing any complex work; they simply initialize the internal members and configure the client component that will be accessing an Azure queue. It’s worth noting however that I explicitly tell the queue client not to enforce any retry policy. In order to provide a robust and reliable storage abstraction layer, I need more granular control over transient issues when performing operations against Azure queues. Therefore, there will be a separate component that recognizes and is able to handle a much larger variety of intermittent faults.

Let’s now take a look at the internals of the ReliableCloudQueueStorage class which I drafted above. Specifically, let’s review its implementation of the Put operation since this is the location of the transparent overflow into a large message store.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

A new code artifact that has just manifested itself in the snippet above is the LargeQueueMessageInfo class. This custom type is ultimately our metadata message that describes the location of a large message. This class is marked as internal as it’s not intended to be visible to anyone outside the storage abstraction layer implementation. The class is defined as follows:

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

Moving forward, I need to implement a large message overflow store that will leverage the Azure Blob Storage service. As I pointed out earlier, this component must support the ICloudBlobStorage interface that will be consumed by the ReliableCloudQueueStorage component to relay messages into the ICloudBlobStorage implementation whenever these cannot be accommodated on a queue due to message size limitation. To set the stage for the next steps, I will include the constructor’s implementation only:

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

Earlier in this article, I have shown the implementation of the Put operation which ensures that small messages will always be placed on a queue whereas large messages will be transparently routed into the overflow store. For the sake of continuity, let’s now review the mechanics behind the counterpart Put operation implemented by the overflow store.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

In summary, the above code takes a blob of type <T> and first checks if this is already a serialized image of a message in the form of a Stream object. All large messages that are relayed to the overflow storage by the ReliableCloudQueueStorage component will arrive as streams, ready for persistence. Next, the UploadBlob action is invoked, which in turn calls into Blob Service Client API, specifically its UploadFromStream operation. If a large message blob fails to upload successfully, the code inspects the error returned by the Blob Service and provides a failover path for 2 conditions: ContainerNotFound and BlobAlreadyExists. In the event that the target blob container is not found, the code will attempt to create the missing container. It performs this action within a retry-aware scope to improve reliability and increase resilience to transient failures. The second failover path is intended to handle a situation where a blob with the same name already exists. The code will remove the existing blob, provided the overwrite behavior is enabled. After removal, the upload of the new blob will be retried. Again, this operation is performed inside a retry-aware scope for increased reliability.

Now that I can store large messages in a blob container, it’s time to design another implementation of the ICloudBlobStorage interface that will leverage the Azure Caching Service. For consistency, let’s start off with its constructors:

/// <summary>
/// Implements reliable generics-aware layer for Azure Caching Service.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Caching Service.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

If you recall from earlier considerations, one of the key technical design decisions was to take advantage of both the Blob Service and Caching Service for storing large messages. The cache option is mostly suited for transient objects not exceeding the recommended payload size of 8MB. The blob option is essentially for everything else. Overall, this decision introduces the need for a hybrid overflow store. The foundation for building a hybrid store is already in the codebase. It’s just the matter of marrying the existing artifacts together as follows:

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Caching Service in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Caching Service.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

At this point, I conclude the saga by including one more code snippet showing the implementation of the Put operation in the hybrid overflow store.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

This article would be considered incomplete if I failed to provide some examples of how the storage abstraction layer discussed above can be consumed from a client application. I will combine these examples with a test application that will also validate the technical implementation.

Validation

In order to prove that large messages can successfully pass back and forth through the newly implemented storage abstraction layer, a very simple console application was put together. In the first step, it takes a sample XML document of 90MB in size and puts it on an Azure queue. In the second step, it consumes a message from the queue. The message should indeed be the original XML document which is written back to the disk under a different name to be able to compare the file size and its content. In between these steps, the application enters a pause mode during which you can explore the content of the queue and respective message overflow store such as cache or blob container. The source code for the test application is provided below.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

For the sake of completeness, below is the application configuration file that was used during testing. If you are going to try the test application out, please make sure you modify your copy of app.config and add the actual storage account credentials and caching service endpoint information.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

Provided the test application has been successfully compiled and executed, the output similar to the following is expected to appear in the console windows:

Console-Window-Output

If you peek into the storage account used by the test application, the following message will appear on the queue:

Serialized-Metadata-Message

Since the test message was large enough to overflow directly into the blob storage, the following screenshot depicts the expected content inside the respective blob container while the test application is paused:

Blob-Container

Note how the original 90MB XML document used in my test became a 11MB blob. This reflects the 87% savings on storage and bandwidth which was the result of applying XML binary serialization. Given the target class of scenarios, XML binary serialization + compression is the first and best choice.

Once the test application proceeds with deletion of the queue message, the metadata message is expected to be removed along with the blob holding the message data as shown on the screenshot below:

Blob-Removed

The example shown above reflects a simplistic view on the lifecycle of a large message. It is intended to highlight the fundamentals of the storage abstraction layer such as large message routing into blob store, transparent compression, automatic removal of both message parts. I guess, it’s now the right time to jump to a conclusion.

Conclusion

As we have seen, the use of Azure Queues can be extended to support messages larger than 64KB by leveraging the Azure Caching Service and Azure Blob Service without adding any additional technical restrictions on the client. In fact, I have shown that with a little extra work you can enhance the messaging experience for the client by providing quality of life improvements such as:

  • Transparent message compression to reduce storage costs and save bandwidth into/out of the datacenter.

  • Transparent, easily customizable overflow of large messages to Cache or Blob storage.

  • Generics support that allows you to easily store any object type.

  • Automatic handling of transient conditions for improved reliability.

As I mentioned earlier, while this solution can use both distributed cache and blob store for overflow storage, the use of Azure Caching Service incurs additional costs. You should carefully evaluate the storage requirements of your project and perform a cost analysis based on projected number of messages and message size before deciding to enable overflow using cache.

While this solution provides an easy to use means of supporting large messages on Azure queues, there is always room for improvement. Some examples of value-add features that are not incorporated in this solution and which you may wish to add are:

  • The ability to configure the type of large message overflow store in the application configuration.

  • The additional custom serializers in case the default one does not meet your performance goals or functional needs (for instance, you don’t need the default compression).

  • An item in the blob’s metadata acting as a breadcrumb allowing you to scan through your blob storage and quickly find out if you have any orphaned large message blobs (zombies).

  • A “garbage collector” component that will ensure timely removal of any orphaned blobs from the overflow message store (in case queues are also accessed by components other than storage abstraction layer implemented here).

The accompanying sample code is available for download from the MSDN Code Gallery. Note that all source code files are governed by the Microsoft Public License as explained in the corresponding legal notices.

Additional Resources/References

For more information on the topic discussed in this article, please refer to the following:

Gruppinnehåll

Lägg till
Visa:
© 2014 Microsoft