Best Practices for Handling Large Messages with Azure Queues

更新日期: 2015年3月

作者:Valery Mizonov

審稿者:Brad Calder、Larry Franks、Jai Haridas、Sidney Higa、Christian Martinez、Curt Peterson 和 Mark Simms

本文的目的是為了針對 Azure 佇列服務提供有關可感知泛型之儲存體抽象層實作的開發人員導向指引。為了在 Azure 佇列中支援非常大型的訊息,必須克服現今的訊息大小限制。本文及相關的程式碼協助您使用 Azure 佇列,而不必牽扯到佇列的 64 KB 限制所加諸的訊息大小簿記。

我們以往認為「640 K 應該足以供任何人使用!」幾 KB 的資料可以輕鬆擁有奢華的儲存空間,當時紀律嚴謹的開發人員能夠愉快地保存他們所有的應用程式資料。目前,現代化應用程式需要極大量的資料才能夠進行交換。不論是微型 HL7 訊息還是多重 MB 的 EDI 文件,現代化應用程式都必須處理以無法預知的速度進化的各種數量特性。由於現代化序列化和表現技術與格式的緣故,在上個世紀以多位元組結構表示的商業物件目前可以輕鬆地將自己呈現為極度渴望儲存體的成品 (比過去大上好幾倍)。

處理給定解決方案架構中的訊息而不針對訊息大小強加任何技術性限制,就是支援不斷進化之資料數量的關鍵所在。大型訊息不見得能夠避免。例如,如果 B2B 解決方案是為了處理 EDI 流量而設計,則必須準備解決方案來接收多達幾 MB 的 EDI 文件。端對端流程中的每一層、服務和元件都必須容納正在處理的文件大小。如果成功地透過 Web 服務接受 20MB 的 EDI 846 存貨建議文件,但是因為佇列的訊息大小限制而無法將文件儲存在佇列中進行處理,則會被視為測試期間不愉快的發現。

為何有人選擇將佇列用於 Azure 平台上的大型訊息?為什麼不選擇其他替代方案,例如 blob、資料表、雲端磁碟機或 Microsoft Azure SQL Database?請注意,佇列允許實作某些類型的傳訊案例,這些案例的特色如下:以可擴充和可靠的方式在產生者與取用者之間進行非同步、鬆散偶合的通訊。使用 Azure 佇列會分離給定解決方案的不同部分,並提供唯一的語意,例如 FIFO (先進先出) 和「至少一次」傳遞。使用其他替代資料交換機制可能很難實作這類語意。此外,佇列最適合當做易變的存放區來交換服務、不同層和應用程式之間的資料,而不適合當做永續性資料存放區。各自的資料交換需求可以許多不同形式自行呈現,例如以非同步方式傳遞元件之間的訊息、負載均衡或是向外延展複雜的運算工作負載。許多資料交換模式無法在沒有佇列的情況下直接實作。總而言之,佇列是非常關鍵的功能。在建立統一佇列架構傳訊解決方案來處理任何大小的資料時,不必擔心哪些資料可以進入或無法進入佇列一直有很大的爭論。

此抽象層可讓您更輕鬆地發行和取用應用程式特有之實體的執行個體,而不必處理位元組陣列或字串,這些是佇列服務 API 目前唯一支援的類型。我們即將廣泛使用 .NET 泛型,這些泛型會利用某些附加價值功能,例如透明串流壓縮和解壓縮,以及套用一些已知的最佳作法,例如處理間歇性失敗,以便改善儲存體作業的容錯功能。

目前的情況如下:大於 64 KB 的訊息 (在序列化及編碼之後) 便無法儲存在 Azure 佇列中。如果您嘗試將超過 64 KB 的訊息放在佇列中,用戶端 API 將會傳回例外狀況。在撰寫這篇文章後,這個屬性傳回的訊息大小限制為 65536。

Important重要事項
當訊息傳輸到佇列時,會受限於 Base64 編碼方式。編碼的裝載一定會大於其原始資料。Base64 編碼方式平均會增加 25% 的負擔。因此,64 KB 的大小上限會有效禁止儲存裝載超過 48 KB (64KB 的 75%) 的所有訊息。

即使這是單一佇列項目的限制,對某些訊息類型而言也可視為具有限制性,特別是無法分解為較小區塊的訊息。從開發人員的觀點來看,擔心給定訊息是否可以容納在佇列中並無法提高生產力。在一天結束時,目標都是希望能夠以最有效率的方式讓應用程式資料在產生者與取用者之間流動,不論資料大小為何。當一端針對佇列呼叫 Put (或 Enqueue) 而另一端叫用 Get (或 Dequeue) 時,其餘的作業理論上應該會自動發生。

運用聰明的方式來處理大型訊息,以克服 Azure 佇列中的訊息大小限制,即為本文所闡述之技術性挑戰的關鍵前提。這樣伴隨著一些額外技巧所需的成本。在商業軟體開發的現代化世界中,任何額外的開發工作都必須以明智的方式證明確實合理。我們即將使用以下的設計目標來證明額外的投資確實合理:

  • 支援極大型的訊息,排除佇列服務 API 所強加的任何限制,因為它與訊息大小有關。

  • 在發行和取用 Azure 佇列中的訊息時,支援使用者定義的泛型物件

  • 以透明方式溢出到可設定的訊息存放區,不論是 blob 容器、分散式快取,還是能夠儲存大型訊息的其他儲存機制類型。

  • 透明壓縮,其目的是為了讓大型訊息取用的儲存空間數量最小化來提高成本效益。

  • 增加可靠性,其形式是在執行佇列作業時,廣泛使用短暫情況處理最佳作法。

在大小受限的佇列中支援大型訊息的基礎將會是以下模式。首先,我們會檢查給定訊息是否可以容納在 Azure 佇列中,而不用執行任何額外工作。判斷訊息是否可以安全地儲存在佇列中而不會違反大小限制的方式是透過一個公式,我們將此公式包裝到 Helper 函數,如下所示:

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

如果訊息大小低於強制的限制,我們應該只會叫用佇列服務 API,原原本本地將訊息加入佇列中。如果訊息大小超出所討論的限制,串流程會變得很有趣。下列流程圖會將後續步驟視覺化:

Store-Message-Flow1

總而言之,如果訊息因為大小的緣故而無法容納在佇列中,它會溢出到能夠儲存大型訊息的訊息存放區中。然後會建立一個很小的中繼資料訊息,此訊息是由項目在溢位存放區內的參考所組成。最後,中繼資料訊息會放在佇列中。在確定訊息適合保存在佇列中時,我們一定會選擇壓縮訊息。這樣可以有效地擴充可以加入佇列之訊息的母體,而不需要進入溢位存放區。有一個很好的範例就是稍微大於 64 KB 的 XML 文件,在執行序列化和壓縮之後,此文件會成為放入佇列中的理想候選項目。萬一預設壓縮並不理想,您也可以修改此行為。可以提供下一節所闡述的自訂序列化程式元件來達成此目標。

這裡適用幾個考量,主要是從成本的觀點來檢視。在上述的流程圖中提過,我想要判斷大型訊息是否可以先溢位至 Azure 快取。因為分散式雲端架構 Caching Service 的使用受限於費用,所以快取溢位路徑應該為選擇性。這會反映在流程圖中。

此外,可能有一些情況是訊息很大,因此不適合儲存在大小受限的分散式快取中。在撰寫本文時,快取大小上限為 4GB。因此,我們必須將這一點列入考量,並且在萬一超出快取容量或配額時,提供容錯移轉路徑。配額隨附著收回行為,這一點也必須計算在內。

Important重要事項
使用 Azure 快取當做溢位存放區有助於降低延遲,並在交換大量訊息時排除過多的儲存體交易。它會提供高可用性、能夠複寫的分散式快取基礎結構,並在多部快取伺服器的記憶體中維護快取資料,以維持耐久性。這些優點超過快取大小限制以及與服務使用有關的成本。因此,執行成本效益分析來評估在某些情況下引進 Azure 快取當做溢位存放區的利弊非常重要。

note附註
如需有關為您的應用程式選擇正確 Azure 快取提供項目的指引,請參閱我適合使用哪個 Azure 快取服務

以下是 Helper 函式,可決定在快取中儲存指定大小的項目時,所指定的項目大小值是否可視為最佳值:

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

我們已經設定一些初步的必要條件,現在該切換到取用者端來查看從佇列中擷取大型訊息的實作模式。首先,為了提升整體的理解,讓我們將流程視覺化:

Large-Message-Overflow

上述流程摘要如下:從佇列提取未知類型的訊息,並針對中繼資料訊息類型做比較。如果這不是中繼資料訊息,此流程會以解壓縮邏輯繼續進行,因此,原始訊息可先以正確的方式重新建構,然後再呈現給取用者。相反地,如果這實際上就是中繼資料訊息,則會檢查此訊息來判斷用於儲存實際訊息的溢位存放區類型。如果將它識別為儲存在快取中的訊息,則會叫用各自的 Caching Service API,而且將會提取真正的訊息,然後再解壓縮並傳回給取用者。如果真正的訊息已放入 blob 容器,則會以 Blob 服務 API 為目標從 blob 實體擷取真正的訊息、將其解壓縮,並送回給呼叫端。

除了處理大型訊息的 EnqueueDequeue 作業以外,也需要確定所有溢位的裝載都會根據取用者的要求,從其各自的溢位訊息存放區中移除。若要完成這項處理,其中一個可能的實作模式就是在針對給定的訊息叫用時,結合移除程序與 Delete 作業。此作業的視覺表示法描述如下:

Large-Message-Overflow

在我們開始實作上面提到的模式之前,最後一件值得考慮的事情就是訊息的定義。哪些東西將視為訊息,訊息又會以何種形式自行呈現?會是位元組陣列、串流、類似字串的簡單類型,還是開發人員當做解決方案物件模型的一部分來實作的複雜應用程式專屬物件?我真心認為,我們不應該在這個領域自我設限。讓我們假設有一個訊息具有泛型類型 <T>,這表示它是開發人員想要使用的任何東西。您會看到,結尾實作將會圍繞這個理念自然地開展。

將所有東西放在一起,下圖摘要說明上述設計所代表的所有三個可能的移動路徑:

Message-Travel-Paths

此時,似乎有足夠的輸入可以開始為技術性設計帶來活力。從這裡開始,我們會將焦點切換到實作上面討論的模式所需的原始程式碼。

若要依照指示進行,請從 MSDN Code Gallery 下載完整的範例程式碼。此範例當做大型端對端參考實作的一部分來提供,該實作是由本文所討論的模式所協力製作。一旦下載並解壓縮之後,請導覽至 Contoso.Cloud.Integration 底下的 Azure.Services.Framework 專案,並展開 [儲存體] 資料夾。此位置包含底下討論的所有主要程式碼成品。

如同一開始所註明,原始的想法是提取雲端應用程式與 Windows Azure 佇列互動的方式。我們藉由提供一個合約來控制自訂儲存體抽象層所支援的主要作業,以達成這個要求。底下顯示程式設計介面,合約會透過此介面呈現給取用者。我們故意省略了底下程式碼片段中的一些基礎結構層級功能 (例如建立和刪除佇列),因為這些功能目前並無法增加顯著的價值。

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

也需要一個額外的合約 (介面) 來提取大型訊息溢位存放區的存取。兩個元件實作此合約,每個溢位存放區一個元件 (blob 儲存體和分散式快取)。此合約是由下列作業所組成:

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

兩個合約都高度依賴泛型型別 <T>。它可讓您修改訊息類型,以配合您所選擇的任何 .NET 類型。不過,我們必須處理一些極端的使用案例,也就是需要特殊處理的類型,例如串流。我們稍後會展開這部分的說明。

不論選擇的訊息類型為何,都將套用一個重要的要求;代表佇列上之訊息的物件類型必須可序列化。所有通過儲存體抽象層的物件都必須要序列化,然後才可進入佇列或溢位存放區。在我們的實作中,序列化和還原序列化也會各自結合壓縮和解壓縮。從成本和頻寬的觀點來看,這個方法會提高效率。與成本相關的優點來自以下事實:壓縮的大型訊息原本就會取用較小的儲存體,因此會降低儲存成本。頻寬效率來自壓縮所節省的裝載大小,接著當資料在 Azure 儲存體之間往返流動時,會讓線路上的裝載變得更小。

序列化及還原序列化的要求會在特殊介面中宣告。實作此介面的任何元件都必須提供特定的壓縮、序列化、還原序列化和解壓縮功能。此介面的範例如下所示:

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

為了壓縮和解壓縮,我們會使用 .NET Framework 中的 DeflateStream 元件。這個類別代表 Deflate 演算法,這是業界標準的 RFC 相容演算法,適用於不失真的壓縮和解壓縮。相較於 GZipStream 類別,前者會產生更優質的壓縮影像,而且通常會提供更好的效能。相反地,GZipStream 類別會使用 GZIP 資料格式,此格式包含用於偵測資料損毀的循環冗餘檢查 (CRC) 值。在幕後,GZIP 資料格式會使用與 DeflateStream 類別相同的壓縮演算法。總而言之,GZipStream = DeflateStream + 計算和儲存 CRC 總和檢查碼的成本。

底下包含我們對於此合約的實作。請注意,可以使用 GZipStream 取代 DeflateStream 類別來輕鬆地切換壓縮演算法,反之亦然。

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

CloudStorageEntitySerializer 實作中的一個強大功能是能夠在處理兩個類別的 XML 文件時套用特殊處理方式:XmlDocumentXDocument。其他值得強調的地方為 XML 資料的最佳序列化和還原序列化。我們在這裡決定利用 XmlDictionaryReaderXmlDictionaryWriter 類別,.NET 開發人員在使用 .NET 二進位 XML 格式針對 XML 裝載執行有效率的序列化和還原序列化時,知道這兩個類別是絕佳的選擇。

決定溢位訊息存放區的類型是呼叫自訂、儲存體抽象層之取用者的責任。沿著這些思考方向,我們即將提供一個選項來選取所要的訊息存放區類型,其方式是在實作 ICloudQueueStorage 介面的類型中新增下列建構函式:

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

上述建構函式不會執行任何複雜的工作,而只會初始化內部成員及設定將要存取 Azure 佇列的用戶端元件。但是有一點值得注意:我們會明確告訴佇列用戶端不要強制執行任何重試原則。為了提供強固且可靠的儲存體抽象層,在針對 Azure 佇列執行作業時,我們需要以更精細的方式來控制暫時性問題。因此,將會有一個個別的元件來辨識間歇性失敗,而且能夠處理各種不同的間歇性失敗。

現在,讓我們來檢視上面所草擬的 ReliableCloudQueueStorage 類別內部。特別是讓我們來檢視其 Put 作業的實作,因為這是大型訊息存放區中之透明溢位的位置。

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

在上述程式碼片段中已經呈現自己的一個新的程式碼成品為 LargeQueueMessageInfo 類別。此自訂類型最終會是我們的中繼資料訊息,用來描述大型訊息的位置。此類別標示為內部,因為它並不打算讓儲存體抽象層實作外面的任何人看到。此類別定義如下:

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

接著,我們必須實作一個大型訊息溢位存放區,此存放區將會利用 Azure Blob 儲存體服務。如同我之前所指出,此元件必須支援將由 ReliableCloudQueueStorage 元件取用的 ICloudBlobStorage 介面,每當因為訊息大小限制而無法在佇列中容納訊息時,便可以將訊息轉送到 ICloudBlobStorage 實作。為了設定後續步驟的階段,我們只包含此建構函式的實作:

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

在本文的稍早,我們已經展示了 Put 作業的實作,以確保小型訊息永遠都會放在佇列上,而大型訊息則會以透明方式路由傳送到溢位存放區。為了提供連續性,現在讓我們來檢閱溢位存放區實作之對應 Put 作業背後的機制。

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

總而言之,上述程式碼會採用 <T> 類型的 blob,並且先檢查這是否已經是 [串流] 物件形式之序列化的訊息影像。ReliableCloudQueueStorage 元件轉送至溢位儲存體的所有大型訊息都將以串流的形式抵達,以供保存。萬一找不到目標 blob 容器,程式碼會嘗試建立遺漏的容器。它會在可感知重試的範圍內執行這個動作,以改善可靠性及提升暫時性失敗的恢復功能。第二個容錯移轉路徑的目的是為了處理有同名的 blob 存在的情況。此程式碼將移除現有的 blob,前提是已啟用覆寫行為。在移除之後,將會重試新 blob 的上傳作業。這個作業同樣會在可感知重試的範圍內執行,以增加可靠性。

我們已經可以將大型訊息儲存在 blob 容器中,所以現在該是設計另一個 ICloudBlobStorage 介面實作的時候,此介面使用 Azure 快取。為求一致性,讓我們從它的建構函式開始:

/// <summary>
/// Implements reliable generics-aware layer for Azure Cache.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

如果您記得之前的考量,就會知道其中一個關鍵的技術性設計決策就是同時利用 Blob 服務和 Azure 快取來儲存大型訊息。此快取選項最適合未超過 8MB 建議裝載大小的暫時性物件使用。blob 選項基本上適合所有其他項目使用。總而言之,這個決策需要「混合式溢位存放區」。建立混合式存放區的基礎已經在程式碼庫中,只需要將現有成品結合在一起,如下所示:

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Cache in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that provides access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

此時,我們會在這段冒險的最後併入一個或多個程式碼片段,在混合式溢位存放區內顯示 Put 作業的實作。


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

如果我們未能提供一些範例來示範如何從用戶端應用程式取用上面所討論的儲存體抽象層,本文將會被視為不完整。我們會將這些範例與測試應用程式結合,此應用程式也將驗證技術性實作方式。

為了證明大型訊息可以透過最新實作的儲存體抽象層來回傳遞,我們已經將一個非常簡單的主控台應用程式放在一起。在第一個步驟中,它會接受大小為 90MB 的範例 XML 文件,並將它放在 Azure 佇列中。在第二個步驟中,它會取用來自佇列的訊息。此訊息確實應該是使用不同名稱寫回磁碟的原始 XML 文件,以便能夠比較檔案大小和其內容。在這些步驟之間,應用程式將進入暫停模式,在這個模式中,您可以探索佇列的內容以及各自的訊息溢位存放區,例如快取或 blob 容器。底下提供測試應用程式的原始程式碼。

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

為求完整性,底下提供在測試期間使用的應用程式組態檔。如果您要試用測試應用程式,請務必修改 app.config 的複本,並且新增實際儲存體帳戶認證和 Caching Service 端點資訊。

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

假設測試應用程式已成功編譯及執行,類似以下的輸出必須出現在主控台視窗中:

Console-Window-Output

如果您看到測試應用程式所使用的儲存體帳戶,下列訊息將會出現在佇列上:

Serialized-Metadata-Message

因為測試訊息夠大,可以直接溢出到 blob 儲存體,所以當測試應用程式暫停時,下列螢幕擷取畫面會描繪各自 blob 容器內的預期內容:

Blob-Container

請注意,測試中使用的原始 90MB XML 文件如何變成 11MB 的 blob。這會反映節省之 87% 的儲存體和頻寬,這是套用 XML 二進位序列化的結果。假使有提供案例的目標類別,XML 二進位序列化 + 壓縮會是第一個選擇,也是最好的選擇。

一旦測試應用程式繼續刪除佇列訊息時,中繼資料訊息必須連同保存訊息資料的 blob 一起移除,如底下的螢幕擷取畫面所示:

Blob-Removed

上面所示的範例反映了大型訊息之生命週期的單純檢視。此範例是為了強調儲存體抽象層的基本面,例如路由傳送至 blob 存放區的大型訊息、透明壓縮、自動移除兩個訊息部分。現在應該是跳到結論的適當時機。

如我們所見,可以擴充 Azure 佇列的使用來支援大於 64 KB 的訊息,其方式是利用 Azure 快取與 Azure Blob 服務,而不對用戶端新增任何額外的技術限制。實際上,我們已經示範過,只要執行一些額外的工作,就可以增強用戶端的傳訊體驗,提供類似以下的生活品質改良:

  • 透明訊息壓縮,用來降低儲存成本,並節省與資料中心之間的出入頻寬。

  • 以透明及可輕鬆自訂的方式將大型訊息溢出到快取或 Blob 儲存體。

  • 可讓您輕鬆地儲存任何物件類型的泛型支援

  • 自動處理暫時性狀況來提升可靠性。

如同之前所述,雖然這個解決方案可以使用分散式快取和 blob 存放區來儲存溢位,但是使用 Azure 快取會發生額外成本。您應該仔細評估專案的儲存需求,並且根據預期的訊息數目和訊息大小來執行成本分析,然後再決定啟用使用快取的溢位。

雖然這個解決方案提供一個容易使用的方式來支援 Azure 佇列上的大型訊息,但總是有改善的空間。有一些範例具有附加價值功能,未併入這個解決方案而且您可能想要新增的功能如下:

  • 能夠在應用程式組態中設定大型訊息溢位存放區的類型。

  • 萬一預設值不符合您的績效目標或功能需求時的其他自訂序列化程式 (例如,您不需要預設壓縮)。

  • 在 blob 的中繼資料中當做階層連結列的項目,可讓您掃描 blob 儲存體,並且快速發現您是否有任何遺棄的大型訊息 blob (廢止項目)。

  • 記憶體回收行程元件,將可確保及時從溢位訊息存放區移除任何遺棄的 blob (如果佇列也是由這裡實作的儲存體抽象層以外的元件所存取)。

隨附的範例程式碼可從 MSDN Code Gallery 下載。請注意,所有原始程式碼檔案都受到 Microsoft 公用授權所控管,如同對應的法律聲明所述。

如需本文所討論之主題的詳細資訊,請參閱下列文件:

顯示: