导出 (0) 打印
全部展开

使用 Azure 队列处理大消息的最佳实践

更新时间: 2014年4月

作者:Valery Mizonov

审阅者:Brad Calder、Larry Franks、Jai Haridas、Sidney Higa、Christian Martinez、Curt Peterson 和 Mark Simms

本文旨在向开发人员提供有关为 Azure 队列服务实现通用存储抽象层的指南。本文主要讨论如何在 Azure 队列中支持很大的消息并克服当前存在的消息大小限制问题。简单来说,阅读此博客文章和相关代码后,你可以利用 Azure 队列而不必担心是否超过了队列要求的 64KB 消息大小限制。

曾经有一段时间,人们认为“64K 对任何人来说都足够用了”。在过去,严谨的开发人员只需要购买几 KB 的存储空间就可以容纳其应用程序的所有数据。现在,现代应用程序需要交换的数据量差别很大。无论是很小的 HL7 消息还是几兆字节的 EDI 文档,现代应用程序都必须能应对数据量以无法预测的速度增长的情况。借助新的序列化和表示技术及格式,在上世纪以几个字节的结构表示的业务对象现在可能很容易就变为需要更大存储空间的项目,比以前的同类项目大几倍。

处理指定解决方案体系结构中的消息而不对消息大小进行技术限制正是数据量不断增长的关键所在。大消息有时是无法避免的。例如,一个 B2B 解决方案设计为处理 EDI 流量,那么,该解决方案需要能够接收高达几兆字节的 EDI 文档,而且,端到端流中的每个层、服务和组件也必须能够容纳要处理的文档大小。如果通过 Web 服务成功接受了一个 20MB EDI 846 库存通知单,但是却由于队列的消息大小限制而无法将其存储在队列中以进行处理,这样的结果在测试过程中肯定是不可接受的。

为什么有人选择在 Azure 平台上使用大消息队列呢?为什么不用其他方法(如 blob、表、云盘或 Microsoft Azure SQL Database)?请注意,通过队列可以实现某类消息传递方案,这些方案具有以下特点:以可扩展、可靠的方式执行生成者和使用者之间的异步、松散耦合的通信。通过使用 Azure 队列,可以不再需要将指定解决方案的不同部分耦合起来,并提供唯一的语义,如“FIFO(先进先出)”和“至少一次”传递。这类语义通过其他替代数据交换机制实现起来可能有点困难。此外,队列最适合用作服务、层和应用程序之间交换数据的临时存储区,而不适合作为永久数据存储区。不同的数据交换要求造就了不同的数据交换形式,如在组件之间异步传递消息、设置负载级别或向外扩展复杂的计算工作负荷。其中的很多数据交换模式,不使用队列的话,就无法直接实现。总之,队列是一项至关重要的功能。构建基于队列的统一消息传递解决方案的一大优势在于,你不必担心什么可以进入队列、什么不能进入队列,它可以处理任何大小的任何数据。

在本文中,我将要实施一种解决方案,以便能够使用 Azure 队列来交换大型消息。我还将提供一个以 StorageClient 命名空间中的 API 为基础构建的抽象层,用以简化我的解决方案与 Azure 队列交互的方式。与必须处理字节数组或字符串(当前队列服务 API 仅支持这些类型)相比,此抽象层将使发布和使用应用程序特定实体的实例变得更容易。我将广泛使用 .NET 通用类型,利用一些增值功能(如透明流压缩和解压缩)并应用一些已知的最佳做法(如处理间歇性故障)来提高存储操作的容错能力。

在目前的情况下,大于 64KB(在序列化和编码后)的消息无法存储在 Azure 队列中。如果你尝试将大于 64KB 的消息放入队列,客户端 API 将返回异常。可以通过查看 CloudQueueMessage 类的 MaxMessageSize 属性来确定最大允许的消息大小。截止到撰写本博客文章时,此属性返回的消息大小限制为 65536。

Important重要提示
CloudQueueMessage.MaxMessageSize 属性中定义的最大消息大小并不反映允许的最大负载大小。消息在传输到队列时采用 Base64 编码方式。编码的负载始终大于其原始数据。Base64 编码平均增加了 25% 的开销。因此,64KB 大小限制将有效禁止存储负载大于 48KB(64KB 的 75%)的任何消息。

尽管这是针对单个队列项的限制,但是它也被视为某类消息的限制,尤其是那些无法分解为更小块的消息。从开发人员角度看,担心队列是否可以容纳指定的消息无助于提高工作效率。最终目标是使应用程序数据以最高效方式在生成者和使用者之间流动,而不管数据大小是多少。尽管一方针对队列调用“Put”(或“Enqueue”),另一方调用“Get”(或“Dequeue”),其余操作在理论上应自动完成。

采用智能方式处理大消息,以此来突破 Azure 队列中的消息大小限制,这是解决本文中所述技术难题的重要前提条件。这需要进行一些额外编程工作。在现代商业软件开发中,任何额外的开发工作都需要谨慎考虑。但是,我将证明,在以下设计目标上的额外投资是绝对值得的:

  • 通过消除队列服务 API 带来的有关消息大小的任何限制,来支持特大消息。

  • 发布和使用 Azure 队列中的消息时支持用户定义的通用对象。

  • 透明溢出到可配置的消息存储区(blob 容器、分布式缓存或可存储大消息的其他类型的存储库)。

  • 透明压缩,通过最大限度减小大消息占用的存储空间来提高成本效率。

  • 通过在执行队列操作时广泛使用暂时性情况处理最佳做法来提高可靠性。

我们将按以下模式确保在大小受限制的队列中支持大消息。首先,检查 Azure 队列中是否可以容纳指定的消息而不执行额外的操作。我包装在帮助器函数中的一个公式,可以用来确定消息能否安全存储在队列中而不违反大小限制,具体如下所示:

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

如果消息大小比限制值小,只需要调用队列服务 API 将消息按“原样”排队即可。如果消息大小超过相关限制值,数据流将变得相当有趣。以下流程图直观显示了后续步骤:

总之,如果消息因为过大而无法在队列中容纳,将溢出到可以存储大消息的消息存储区。然后创建一个小的元数据消息,其中包含对溢出存储区中项的引用。最后,将该元数据消息放入队列。我总是选择先压缩消息,然后再判断该消息是否适合放置在队列中。这有效地增加了可以加入排队而不用进入溢出存储区的消息范围。例如,一个略大于 64KB 的 XML 文档在执行序列化和压缩后,可以直接放入队列。如果默认压缩不满足你的要求,你可以修改压缩行为。这可以通过提供一个自定义的序列化程序组件来实现,具体请参见下一节内容。

此处需要注意几个事项,主要是成本方面的考虑。如上述流程图中所示,我尝试确定大消息是否可以首先溢出到 Azure Caching Service(此处简称为“Caching Service”)。因为使用基于云的分布式 Caching Service 需要付费,因此缓存溢出路径应有选择余地。这反映在了流程图中。

此外,在某些情况下,消息很大已不适合存储在大小受限制的分布式缓存中。截止到撰写本文时,最大缓存大小为 4GB。因此,我们必须考虑这一点,在超过缓存容量配额时提供故障转移路径。配额与需要考虑的逐出行为紧密相关。

Important重要提示
在交换大量消息时,使用 Azure Caching ServHFice 作为溢出存储区将帮助减小延迟时间和消除过多的存储事务。它提供高度可用的分布式缓存基础结构,可以跨多个缓存服务器复制和维护内存中的缓存数据,以提供耐久性。这些好处超过了缓存大小限制和与服务使用关联的成本。因此,执行成本收益分析以便评估将 Caching Service 作为某些方案中的溢出存储区引入的利弊十分重要。

考虑到分布式缓存存储容量有限,有必要设置一些精细的规则来高效使用缓存。为此,需要明确指出一个重要的建议值:

Important重要提示
由于其逐出行为的特点,Caching Service 与 Azure Blob 服务相比并不提供完全的数据耐久性保证。作为溢出存储区使用时,当各个消息是临时的且小于 8MB 时,Caching Service 最合适。术语“临时”表示将消息尽可能快地发布到队列并随后使用它们。根据默认情况下在 Caching Service 中配置的最佳缓存项大小,我们提出了 8MB 的建议值。

我将通过提供一个帮助器函数来在代码中反映上述建议值,该函数将确定在缓存中存储指定大小的项时指定的项大小值是否被视为最佳值。

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

考虑了一些初始先决条件后,让我们切换到使用者端,查看从队列检索大消息的实现模式。首先,我们将流程可视化,以便于总体理解:

为了概括上述流程,我们从队列提取一个未知类型的消息并将它与元数据消息类型进行比较。如果它不是元数据消息,则使用解压缩逻辑继续执行流程,以便在提交给使用者前可以正确重构原始消息。相反,如果它确实是元数据消息,则检查它以确定用于存储实际消息的溢出存储区类型。如果它被标识为在缓存中存储的消息,则调用相应的 Caching Service API 并提取实际消息,之后进行解压缩并返回给使用者。如果将实际消息放入了 blob 容器,将使用 Blob 服务 API 从 blob 实体检索实际消息、解压缩并返回给调用者。

除了处理大消息的“Enqueue”和“Dequeue”操作外,还需要确保根据使用者的请求从各自的溢出消息存储区中删除了所有溢出的负载。要做到这点,一个可能的实现模式是在为指定的消息调用它时,将删除过程与“Delete”操作关联。此操作的直观表示形式如下所示:

在我们开始实现上述模式前,最后要考虑的事项是消息的定义。什么才算是消息,它采用什么形式来表示自己?它是字节数组、数据流、类似字符串的简单类型,还是开发人员作为解决方案对象模型的一部分实现的应用程序特定的复杂对象?我真的认为这方面不应该受到任何限制。让我们假定消息属于通用类型 <T>,这意味着它可以是开发人员想使用的任何类型。你将发现最终实现将自然围绕这一思路展开。

综上所述,下面的关系图汇总了上述设计中将采用的全部三个可能的路径:

此时,已有足够的输入信息将技术设计付诸实现了。从此处开始,我将着重介绍实现上述模式所需的源代码。

要继续,请从 MSDN 代码库下载完整示例代码。该示例是一个较大的端到端引用实现的一部分,并且采用本文所述的模式。下载并解压缩后,导航到 Contoso.Cloud.Integration 下的 Azure.Services.Framework 项目,然后展开 Storage 文件夹。此位置包含下面将要讨论的所有主要代码项目。

正如我们开头所述,最初的思路是将云应用程序与 Window Azure 队列交互的方式抽象出来。我通过提供一个约定来满足此要求,该约定管理自定义存储抽象层支持的主要操作。将约定显示给使用者所用的编程接口如下所示。我在下面的代码段中有意省略了几个基础结构级函数(如创建和删除队列),因为这些函数此时所起的作用不大。

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

还需要一个额外的约定(接口),它用于抽象对大消息溢出存储区的访问。该约定由两个组件实现,每个组件对应一个溢出存储区(blob 存储和分布式缓存)。该约定包含以下操作:

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

两个约定都很依赖通用类型 <T>。它允许你将消息类型定制为所选的任意 .NET 类型。但是,我必须处理一些极端的使用案例,即需要特别处理的类型(如流)。我稍后将详细讲述。

无论所选的消息类型是什么,都要满足一个重要要求:表示队列中的消息的对象类型必须是可序列化的。穿过存储抽象层的所有对象都要进行序列化,之后才能放入队列或溢出存储区。在我的实现中,序列化和反序列化也分别与压缩和解压缩关联。从成本和带宽角度看,此方法提高了效率。成本削减主要得益于压缩后的大消息将占用更少的存储空间,因此可以降低存储成本。带宽下降则是因为压缩导致负载变小,从而在与 Azure 存储设备之间进行传输时线路上的负载更小。

序列化和反序列化的要求在专门的接口中声明。实现此接口的任何组件都必须提供特定的压缩、序列化、反序列化和解压缩功能。以下是此接口的示例:

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

对于压缩和解压缩,我使用了 .NET Framework 中的 DeflateStream 组件。此类表示 Deflate 算法,它是用于无损文件压缩和解压缩的、符合行业标准 RFC 的算法。与 GZipStream 类相比,前者生成更佳的压缩映像,通常提供更好的性能。相比之下,GZipStream 类使用 GZIP 数据格式,该格式包含用于检测数据损坏的循环冗余检查 (CRC) 值。实质上 GZIP 数据格式使用了与 DeflateStream 类相同的压缩算法。总之,GZipStream = DeflateStream + 计算和存储 CRC 校验和的成本。

下面显示了我的约定实现。请注意,可以通过用 GZipStream 替换 DeflateStream 类轻松切换压缩算法,反之亦然。

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

CloudStorageEntitySerializer 实现中的一个强大功能是在处理以下两类 XML 文档时可以采用特殊处理方式:XmlDocumentXDocument。另一个需要强调的地方是 XML 数据的最佳序列化和反序列化。在此处我决定利用 XmlDictionaryReaderXmlDictionaryWriter 类,当使用 .NET 二进制 XML 格式执行 XML 负载的高效序列化和反序列化时,这些类对于 .NET 开发人员来说是极好的选择。

至于选择什么类型的溢出消息存储区由在自定义存储抽象层中调用的使用者决定。在这些行之后,我将通过在实现 ICloudQueueStorage 接口的类型中添加以下构造函数提供一个选项来选择所需的消息存储区类型:

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

上述构造函数不执行任何复杂的操作,它们只是初始化内部成员并配置将要访问 Azure 队列的客户端组件。需要指出的是,我显式告诉队列客户端不执行任何重试策略。为了提供稳定可靠的存储抽象层,在针对 Azure 队列执行操作时我需要更精细控制暂时性问题。因此,我使用一个单独的组件来识别和处理各种间歇性故障。

现在我们来看一下我在上文选取的 ReliableCloudQueueStorage 类的情况。具体而言,我们要查看“Put”操作的实现,因为这是透明溢出到大消息存储区的位置。

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

在上述代码段中有一个只表示自己的新代码项目,那就是 LargeQueueMessageInfo 类。此自定义类型最终是描述大消息位置的元数据消息。此类被标记为内部,因为它对存储抽象层实现之外的任何对象都不可见。此类的定义如下:

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

接着,我需要实现将利用 Azure Blob 存储服务的大消息溢出存储区。正如我以前所指出的那样,此组件必须支持 ICloudBlobStorage 接口,在由于消息大小限制无法在队列中容纳消息时,ReliableCloudQueueStorage 组件将使用该接口将消息中继到 ICloudBlobStorage 实现。为了设置后续步骤的阶段,我将仅包含构造函数的实现:

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

在前文中,我显示了“Put”操作的实现,该实现确保始终将小消息放置到队列而将大消息透明路由到溢出存储区。为了保持连续性,我们现在来查看一下溢出存储区所实现的对应方“Put”操作背后的方法。

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

总之,上面的代码读取类型 <T> 的 blob,首先检查它是否是 Stream 对象形式的消息的序列化映像。由 ReliableCloudQueueStorage 组件中继到溢出存储区的所有大消息将作为流到达,准备永久保存。接下来,调用 UploadBlob 操作,这反过来调用 Blob 服务客户端 API,特别是它的 UploadFromStream 操作。如果大型消息 blob 无法成功上载,代码将检查 Blob 服务返回的错误并为以下两种状态提供故障转移路径:ContainerNotFoundBlobAlreadyExists。如果找不到目标 blob 容器,代码将尝试创建缺失的容器。它在重试允许的范围内执行此操作,以提高可靠性和增加处理暂时性故障的弹性。第二个故障转移路径旨在处理存在同名 blob 的情况。如果允许覆盖行为,代码将删除现有 blob。删除后,将重试上载新 blob。同样,在重试允许的范围内执行此操作以提高可靠性。

在将大消息存储在 blob 容器后,现在可以开始设计将利用 Azure Caching Service 的 ICloudBlobStorage 接口的另一个实现了。为了保持一致性,让我们从它的构造函数开始编程:

/// <summary>
/// Implements reliable generics-aware layer for Azure Caching Service.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Caching Service.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

如果你想起前文所述的注意事项,其中一个重要的技术设计决策是利用 Blob 服务和 Caching Service 来存储大消息。缓存选项最适合用于不超过建议的 8MB 负载大小的暂时性对象。blob 选项则用于除此之外的其他所有对象。总的来说,此决策需要引入“混合溢出存储区”。构建混合存储区的基础代码已包含在代码库中。只需要将现有项目组合在一起,具体如下所示:

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Caching Service in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Caching Service.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

此时,我包含另一个代码段来结束编程,该代码段显示混合溢出存储区中“Put”操作的实现。


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

如果我不提供如何从客户端应用程序使用前文所述的存储抽象层的一些示例,本文内容将会显得不完整。我将在后面的技术实现验证过程中,用一个测试应用程序来展示这些示例。

为了证明大消息可以成功通过新实现的存储抽象层进行来回传输,我创建了一个很简单的控制台应用程序。首先,该应用程序选取一个 90MB 的示例 XML 文档并将它放入 Azure 队列。接着,该应用程序使用队列中的一个消息。该消息应确实是写回磁盘的原始 XML 文档,但是采用不同的名称以比较文件大小及其内容。在这些步骤之间,应用程序进入暂停模式,以便你可以浏览队列内容和相应的消息溢出存储区(如缓存或 blob 容器)。以下是测试应用程序的源代码:

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

为了保持完整性,下面提供了测试期间使用的应用程序配置文件。如果你要试用该测试应用程序,请确保修改自己的 app.config 副本并添加实际存储帐户凭据和 Caching Service 端点信息。

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

如果成功编译并执行了测试应用程序,在控制台窗口中将显示类似以下内容的输出信息:

如果你查看测试应用程序使用的存储帐户,队列上将显示以下消息:

因为测试消息足够大,可以直接溢出到 blob 存储区,以下屏幕快照显示了在测试应用程序暂停期间,相应的 blob 容器中应该存在的内容:

请注意在我的测试中使用的原始 90MB XML 文档如何变为 11MB blob。这意味着在应用 XML 二进制序列化后节省了 87% 的存储空间和带宽资源。对于方案的目标类,XML 二进制序列化 + 压缩是最好的选择。

测试应用程序接着删除队列消息后,要求将元数据消息随保存消息数据的 blob 一起删除,如下面的屏幕快照中所示:

上面所示的示例反映了大消息的生命周期的简化视图。它旨在突出显示存储抽象层的基本元素,如大消息路由到 blob 存储区、透明压缩、自动删除两个消息部件。现在到了得出结论的时候了。

正如我们所看到的,通过使用 Azure Caching Service 和 Azure Blob 服务,并且无需对客户端施加任何额外技术限制,即可扩展 Azure 队列的使用范围,使它支持大于 64KB 的消息。实际上,我已说明只需多做一点额外工作,通过改进生命周期的一些环节(如以下所列),你就可以增强客户端的消息传递体验:

  • 透明消息压缩以降低存储成本和节省与数据中心交换数据的带宽。

  • 将大消息透明、可轻松自定义地溢出到缓存或 Blob 存储区。

  • 提供通用支持,使你可以轻松存储任意对象类型。

  • 自动处理暂时性情况以提高可靠性。

如我以前所述,尽管此解决方案可以使用分布式缓存和 blob 存储区作为溢出存储区,使用 Azure Caching Service 会产生额外成本。你应认真评估项目的存储要求并根据预计的消息数和消息大小进行成本分析,之后再决定是否使用缓存作为溢出存储区。

尽管此解决方案提供了易于使用的方式来在 Azure 队列上支持大消息,但是它仍有改进的空间。以下列出了一些此解决方案中没有的、你可能要添加的增值功能的示例:

  • 可以在应用程序配置中配置大消息溢出存储区的类型。

  • 提供其他自定义序列化程序,以防默认序列化程序无法满足你的性能目标或功能要求(例如,你不需要默认压缩)。

  • 充当痕迹的 blob 元数据中的项,它使你可以扫描 blob 存储区并快速查找是否有孤立的大消息 blob(僵尸)。

  • “垃圾收集器”组件,它确保及时从溢出消息存储区中删除所有孤立的 blob(如果队列也可被除此处实现的存储抽象层之外的组件访问)。

随附的示例代码可从 MSDN 代码库下载。请注意,所有源代码文件均受到相应法律声明中所述的 Microsoft 公共许可证的管辖。

有关本文中所论述主题的详细信息,请参考以下内容:

显示:
© 2014 Microsoft