エクスポート (0) 印刷
すべて展開

Azure キューでサイズの大きいメッセージを処理するためのベスト プラクティス

更新日: 2014年4月

執筆者: Valery Mizonov

校閲者: Brad Calder、Larry Franks、Jai Haridas、Sidney Higa、Christian Martinez、Curt Peterson、Mark Simms

この記事は、Azure Queue Service の汎用対応ストレージ抽象化レイヤーの実装に関する開発者向けガイダンスを提供することを目的としています。ここで説明する主な問題領域は、Azure キューでのきわめてサイズの大きいメッセージのサポートと、現在存在しているメッセージ サイズの制限の解決です。このブログと関連コードを参照すると、キューの 64 KB 制限により課せられるメッセージ サイズのブックキーピングを行わずに Azure キューを利用できます。

以前 "どの人も 640 K で十分である" という時代がありました。過去にトレーニングを受けた開発者は、数 KB でアプリケーションのすべてのデータを問題なく格納できる贅沢なストレージ領域を獲得できました。現在、最新のアプリケーションを交換するために必要なデータ量は大きく変わっている可能性があります。小さな HL7 メッセージであっても数 MB の EDI ドキュメントであっても、最新のアプリケーションは予測できない速度で進化するあらゆる種類の容量特性に対処する必要があります。前世紀にマルチバイト構造で表されていたビジネス オブジェクトは、最新のシリアル化と表現技法および形式により、現在は従来のものより数倍大きいストレージを大量消費するアーティファクトとして簡単に表すことができます。

絶え間なく進化するデータ ボリュームをサポートするには、メッセージのサイズに技術的制限を課さずに特定のソリューション アーキテクチャでメッセージを処理することが重要です。サイズの大きいメッセージを回避できない場合があります。たとえば、EDI のトラフィックを処理するように B2B ソリューションが設計されている場合、そのソリューションは数 MB までの EDI ドキュメントを受信できるよう準備する必要があります。エンド ツー エンド フローのすべての層、サービス、およびコンポーネントが、処理されているドキュメントのサイズに対応できる必要があります。Web サービス経由で 20 MB の EDI 846 在庫アドバイス ドキュメントを正常に受け入れても、キューのメッセージ サイズ制約によりそのドキュメントをキューに格納して処理できない場合、テスト中に予期しない検出と見なされます。

Azure Platform でサイズの大きいメッセージにキューを使用するのはどうしてでしょうか。BLOB、テーブル、クラウド ドライブ、Microsoft Azure SQL データベースなど他にも多数存在する代替方法ではだめなのでしょうか。キューを使用すると、拡張性および信頼性の高い方法で実行されるプロデューサーとコンシューマー間の非同期の疎結合通信によって特徴付けられた特定の種類のメッセージング シナリオを実装することができます。Azure キューを使用すると、特定のソリューションの各部分が切断され、FIFO (先入れ先出し) や At-Least-Once 配信などの一意なセマンティクスが提供されます。このようなセマンティクスを他の代替データ交換メカニズムを使用して実装するのは若干困難な場合があります。さらに、キューは、永続的なデータ ストレージとしてではなく、サービス、層、およびアプリケーション間でデータを交換するための揮発性ストアとして最適です。それぞれのデータ交換要件は、コンポーネント間の非同期でのメッセージの受け渡し、負荷平準化、複雑な計算ワークロードのスケール アウトなど、さまざまな形式で表されます。これらのデータ交換パターンの多くは、キューを使用せずに実装できるほど単純なものではありません。つまり、キューはきわめて重要な機能なのです。キューに配置できるデータと配置できないデータについて心配する必要をなくすのは、種類やサイズを問わずどのデータでも処理できる統合されたキュー ベースのメッセージング ソリューションを構築するための強力な論拠になります。

ここでは、サイズの大きいメッセージの交換に Azure キューを使用するためのソリューションを実装します。また、StorageClient 名前空間にある API の上に構築される抽象化レイヤーを提供して、ソリューションと Azure キューとの相互運用を簡素化します。この抽象化レイヤーは、現在キュー サービス API でサポートされている型に限定されるバイト配列または文字列に対処する必要があるのとは対照的に、アプリケーション固有のエンティティのインスタンスのパブリッシュと使用を簡素化できます。.NET ジェネリックを幅広く利用し、ストレージ操作のフォールト トレランスを向上させるために、透過的なストリーム圧縮と圧縮解除などの付加価値機能を利用するのに加え、断続的なエラーの処理などの既知のベスト プラクティスも適用します。

現在、(シリアル化およびエンコードされた後の) 64 KB を超えるサイズのメッセージは、Azure キューに格納できません。クライアント側 API は、キューに 64 KB を超えるメッセージを配置しようとすると例外を返します。メッセージの最大許容大サイズは、CloudQueueMessage クラスの MaxMessageSize プロパティを調べることによって確認できます。この投稿の作成時点では、このプロパティによって返されるメッセージ サイズの上限は 65536 です。

Important重要
CloudQueueMessage.MaxMessageSize プロパティで定義される最大メッセージ サイズには、ペイロードの最大許容サイズは反映されません。メッセージはキューに送信されるときに Base64 エンコードの対象になります。エンコードされたペイロードは常にその生データより大きくなります。Base64 エンコードにより平均で 25% のオーバーヘッドが追加されます。その結果、64 KB のサイズ制限によって、48 KB (64 KB の 75%) を超えるペイロードを含むメッセージを格納することは事実上禁止されます。

これは単一のキュー項目の制限ですが、特定の型のメッセージ、特に小さなチャンクに分割できないメッセージに対しては禁止と見なすことができます。開発者の観点からは、特定のメッセージがキューに格納されるかどうかを心配していても生産性は向上しません。最終的な目標は、データのサイズに関係なく、最も効率的な方法を使用してプロデューサーとコンシューマー間でアプリケーション データをやり取りすることです。キューに対して一方の側が Put (または Enqueue) を呼び出し、もう一方が Get (または Dequeue) を呼び出しますが、それ以外は論理上自動的に発生します。

サイズの大きいメッセージを効率的に処理する方法を採用して Azure キューのメッセージ サイズ制限を解消することが、ここで取り上げる技術的な課題の主要な前提となります。これには追加技能の犠牲が伴います。最新の商用ソフトウェア開発の世界では、追加の開発作業を賢明に正当化する必要があります。以下のデザイン目標で追加の投資を正当化します。

  • メッセージのサイズに関連してキュー サービス API で課せられる制限の排除によるきわめてサイズの大きいメッセージのサポート

  • Azure キューからメッセージをパブリッシュおよび使用する場合のユーザー定義の汎用オブジェクトのサポート

  • BLOB コンテナー、分散キャッシュ、サイズの大きいメッセージを格納できるその他の種類のリポジトリなど構成可能なメッセージ ストアへの透過的なオーバーフロー

  • サイズの大きいメッセージで使用されるストレージ領域の量を最小化してコスト効果を向上させることを目的とした透過的な圧縮

  • キューの操作を実行するときに一時的な状態処理のベスト プラクティスを活用することで強化される信頼性

サイズに制約があるキューでサイズの大きいメッセージをサポートするための基礎は次のパターンになります。最初に、追加の作業を実行せずに特定のメッセージを Azure キューに格納できるかどうかをチェックします。サイズ制約に違反せずにメッセージをキューに安全に格納できるかどうかを判断するには、次のようにヘルパー関数にラップする式を使用します。

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

メッセージのサイズが適用制限下にある場合は、単純にキュー サービス API を呼び出し、メッセージを "現状のまま" エンキューする必要があります。メッセージのサイズが該当の制限を超える場合、データ フローはかなり興味深いことになります。次のフローチャートは、以降の手順を図で表したものです。

要約すると、サイズ制限のためメッセージがキューに格納できない場合、メッセージはサイズの大きいメッセージを格納できるメッセージ ストアにオーバーフローされます。次に、オーバーフロー ストア内の項目への参照で構成される小さなメタデータ メッセージが作成されます。最後に、メタデータ メッセージがキューに格納されます。常にメッセージの圧縮を選択してから、キューの永続性に対するメッセージの適合性をアサートします。これにより、オーバーフロー ストアに移動する必要性が生じることなくキューに格納できるメッセージの作成が効率的に拡張されます。たとえば、64 KB よりも若干大きい XML ドキュメントは、シリアル化と圧縮が実行された後、キューに簡単に格納できる最適な候補となります。既定の圧縮が適切ではない場合は、この動作を変更できます。動作を変更するには、次のセクションで説明するカスタムのシリアライザー コンポーネントを提供します。

ここでは、主にコストの観点からいくつかの考慮事項を適用していきます。上のフローチャートで確認できるように、サイズの大きいメッセージを最初に Azure Caching Service (ここでは簡略化のため "Caching Service" と呼びます) にオーバーフローできるかどうかを確認します。分散クラウドベースのキャッシュ サービスの利用には料金がかかるため、キャッシュのオーバーフロー パスは省略可能にする必要があります。これはフローチャートに反映されています。

また、メッセージのサイズがきわめて大きいため、サイズ制約のある分散キャッシュに格納するのには適していない場合があります。この記事の作成時点における最大キャッシュ サイズは 4 GB です。そのため、この点を考慮し、キャッシュの容量またはクォータを超える場合はフェールオーバー パスを指定する必要があります。クォータには削除動作も伴うため、この点についても考慮が必要です。

Important重要
オーバーフロー ストアとして Azure Caching Service を使用すると、待機時間が短縮され、多数のメッセージを交換するときの過剰なストレージ トランザクションが削減されます。この場合、複数のキャッシュ サーバーのメモリでキャッシュ データをレプリケートおよび管理して持続性を実現できる、可用性に優れた分散キャッシュ インフラストラクチャが提供されます。キャッシュ サイズの制限およびサービスの使用に関連するコストは、これらの利点を上回ります。したがって、費用便益分析を実行して、特定のシナリオで Caching Service を導入する利点と欠点を評価することが重要です。

分散キャッシュ ストレージが制限されている場合は、キャッシュの有効な利用を可能にするさらに多くのルールを定める必要があります。これに関連する 1 つの重要な推奨設定を説明します。

Important重要
削除動作の特質により、Caching Service では、Azure BLOB サービスと比較した場合、完全で最終的に保証される持続性は提供されません。オーバーフロー ストアとして使用する Caching Service は、個々のメッセージが本質的に揮発性で、そのサイズが 8 MB 以下の場合に最も適しています。"揮発性" とは、メッセージがパブリッシュされてからできる限り迅速に使用されることを意味します。8 MB の推奨設定は、Caching Service で既定で構成される最適なキャッシュ項目のサイズによるものです。

特定のサイズの項目をキャッシュに格納するときに指定された項目のサイズ値が最適と見なすことができるかどうかを判別するヘルパー関数を指定して、コードにこの推奨設定を反映します。

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

最初の前提条件は検討したので、次にコンシューマー側に切り替えて、キューからサイズの大きいメッセージを取得するための実装パターンについて説明していきます。まず、全体を把握するために、処理フローを図で示します。

このフローをまとめると、不明な型のメッセージがキューからフェッチされ、メタデータ メッセージの型と比較されます。メタデータ メッセージでない場合、フローは圧縮解除論理で続行されるため、元のメッセージは正しく再構築されてからコンシューマーに表示することができます。一方、メタデータ メッセージであった場合は、メッセージを調べて実際のメッセージの格納に使用されたオーバーフロー ストアの種類が確認されます。キャッシュに格納されたメッセージとして特定された場合は、各 Caching Service API が呼び出されて、実際のメッセージはフェッチされてから圧縮解除されてコンシューマーに返されます。実際のメッセージが BLOB コンテナーに配置されている場合、BLOB サービス API がターゲットとなり BLOB エンティティから実際のメッセージが取得され、圧縮解除されて呼び出し元に戻されます。

サイズの大きいメッセージの Enqueue および Dequeue 操作の処理に加えて、コンシューマーの要求に応じてオーバーフローされたすべてのペイロードが各オーバーフロー メッセージ ストアから削除されているかどうかを確認する必要があります。これを実行できる実装パターンの 1 つが、特定のメッセージに対して削除処理を呼び出すときに Delete 操作と組み合わせることです。この操作を表した図を次に示します。

前述したパターンの実装を開始する前の最後の考慮事項は、メッセージの定義です。どのようなものがメッセージと見なされ、メッセージはどの形式で表されるのでしょうか。メッセージは、バイト配列、データのストリーム、文字列などの単純型、または開発者がソリューションのオブジェクト モデルの一部として実装する複雑なアプリケーション固有のオブジェクトのどれになるのでしょうか。これは自身を制約してはならない領域であると思います。メッセージは汎用型 <T> であるとします。この汎用型は開発者が必要とするすべての内容を表します。最終的な実装は、必然的にこの意図を表すものになります。

すべてをまとめた次の図は、前述したデザインで考慮される 3 つのすべての可能な移動パスの概要を示しています。

この時点で、技術的なデザインの実現を開始するのに十分な入力があります。ここから先は、前述したパターンを実装するために必要なソース コードに焦点を当てて説明していきます。

作業を進めるには、MSDN コード ギャラリーから完全なサンプル コードをダウンロードします。このサンプルは、この記事で説明するパターンを利用するさらに大きいエンド ツー エンドの参照実装の一部として提供されます。ダウンロードして解凍したら、Contoso.Cloud.Integration の下の Azure.Services.Framework プロジェクトに移動し、Storage フォルダーを展開します。この場所には、以下で説明するすべての主要なコード アーティファクトが格納されています。

最初に説明したとおり、元の意図は、クラウド アプリケーションと Windows Azure キューとの相互運用を抽象化することでした。カスタムのストレージ抽象化レイヤーでサポートされる主要な操作を管理するコントラクトを指定して、この要件に対処します。コントラクトがコンシューマーに表示されるときに使用されるプログラミング インターフェイスを以下に示します。以下のコード スニペットでは、キューの作成や削除など、この時点であまり関係のないインフラストラクチャ レベルの機能をいくつか意図的に省略しています。

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

サイズの大きいメッセージのオーバーフロー ストアへのアクセスを抽象化する追加の 1 つのコントラクト (インターフェイス) も必要です。2 つのコンポーネントで、オーバーフロー ストア (BLOB ストレージと分散キャッシュ) ごとに 1 つコントラクトを実装します。コントラクトは、次の操作で構成されています。

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

どちらのコントラクトも汎用型 <T> に大きく依存します。メッセージの型は選択した任意の .NET 型に合わせて調整することができます。ただし、一部の極端な使用例、つまりストリームなどの特別な処理を要求する型を処理する必要があります。これについては後で詳しく説明します。

選択したメッセージ型に関係なく、1 つの重要な要件が適用されます。それは、キュー内のメッセージを表すオブジェクトの種類はシリアル化できなければならないということです。ストレージ抽象化レイヤーを通過するすべてのオブジェクトは、キューまたはオーバーフロー ストアに配置される前に、シリアル化されます。この実装では、シリアル化とシリアル化解除がそれぞれ圧縮と圧縮解除にも組み合わされています。この方法により、コストと帯域幅の観点から効率性が向上します。圧縮されたサイズの大きいメッセージでは基本的に使用されるストレージ量が少なくなり、ストレージ コストが低く抑えられるため、コスト関連のメリットがあります。圧縮によりペイロードのサイズが節約され、その結果 Azure ストレージとの間を流れるときにネットワーク上のペイロードが小さくなるため、帯域幅の効率性が向上します。

シリアル化とシリアル化解除の要件は、特殊なインターフェイスで宣言されます。このインターフェイスを実装するどのコンポーネントも、固有の圧縮、シリアル化、シリアル化解除、および圧縮解除機能を提供する必要があります。このインターフェイスの例を以下に示します。

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

圧縮と圧縮解除については、.NET Framework の DeflateStream コンポーネントを使用します。このクラスは Deflate アルゴリズムを表します。Deflate アルゴリズムは、可逆ファイル圧縮および圧縮解除の業界標準 RFC 準拠アルゴリズムです。GZipStream クラスと比較すると、前者の方がより最適な圧縮されたイメージを生成し、通常はより高いパフォーマンスを実現できます。これに対して、GZipStream クラスは、データの破損を検出するための巡回冗長検査 (CRC) の値を含む GZIP データ形式を使用します。その背景で、GZIP データ形式は DeflateStream クラスと同じ圧縮アルゴリズムを使用します。つまり、GZipStream = DeflateStream + CRC のチェックサムを計算および格納するためのコストということになります。

コントラクトの実装を、以下に示します。圧縮アルゴリズムは DeflateStream クラスを GZipStream クラスに置き換えるかその逆を実行して簡単に切り替えることができます。

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

CloudStorageEntitySerializer 実装の強力な機能の 1 つに、XmlDocumentXDocument の両方の XML ドキュメントを処理するときに特殊な処理を適用できるという点があります。他の注目すべき領域は、XML データの最適なシリアル化とシリアル化解除です。ここでは、.NET バイナリ XML 形式を使用して XML ペイロードの効率的なシリアル化とシリアル化解除を実行するときの最適な方法として .NET 開発者に知られている XmlDictionaryWriter クラスと XmlDictionaryReader クラスを利用することにしました。

オーバーフロー メッセージ ストアの種類については、カスタムのストレージ抽象化レイヤーを呼び出すコンシューマーが決定します。これらの行と共に、ICloudQueueStorage インターフェイスを実装する種類に次のコンストラクターを追加して、目的のメッセージ ストアの種類を選択するオプションを提供します。

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

これらのコンストラクターでは複雑な作業は実行されません。これらのコンストラクターは内部のメンバーを初期化し、Azure キューにアクセスするクライアント コンポーネントを構成します。ただし、再試行ポリシーを適用しないようにキュー クライアントに明示的に伝える点には注意してください。堅牢性と信頼性を備えたストレージ抽象レイヤーを提供するためには、Azure キューで操作を実行するときに一時的な問題に対してより細やかな制御が必要になります。したがって、さらに多くの種類の断続的なエラーを認識し、処理できる別のコンポーネントが用意されます。

ここでは、前述した ReliableCloudQueueStorage クラスの内部を見ていきましょう。具体的には、Put 操作の実装を確認していきます。これは、大きなメッセージ ストアへの透過的なオーバーフローの場所を表しています。

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

このスニペットに示されている新しいコード アーティファクトは LargeQueueMessageInfo クラスです。このカスタム型が最終的に大きなメッセージの場所を表すメタデータ メッセージになります。このクラスは、ストレージ抽象化レイヤー実装の外部のユーザーには表示されないため、内部としてマークされます。このクラスは、次のように定義されます。

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

この後、Azure BLOB ストレージ サービスを利用する大きなメッセージ オーバーフローを実装する必要があります。前に説明したように、このコンポーネントは、メッセージのサイズ制限によりメッセージをキューに格納できないときにメッセージを ICloudBlobStorage 実装に伝えるために ReliableCloudQueueStorage コンポーネントで使用される ICloudBlobStorage インターフェイスをサポートする必要があります。次の手順の段階を設定するために、コンストラクターの実装のみを含めます。

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

この記事の初めに、サイズの小さいメッセージは常にキューに格納されるのに対し、サイズの大きいメッセージは透過的にオーバーフロー ストアにルーティングされるための Put 操作の実装を示しました。継続性のために、オーバーフロー ストアで実装された同等の Put 操作の背後にあるメカニズムを確認していきましょう。

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

つまり、このコードでは <T> 型の BLOB を設定し、これが Stream オブジェクトの形式で既にシリアル化されているメッセージのイメージであるかどうかを最初に確認します。ReliableCloudQueueStorage コンポーネントによってオーバーフロー ストレージに中継されるサイズの大きいすべてのメッセージはストリームとして到着し、永続化できます。次に、UploadBlob アクションが呼び出され、このアクションによって BLOB サービス クライアント API、具体的にはその UploadFromStream 操作が呼び出されます。サイズの大きいメッセージ BLOB が正常にアップロードできない場合、コードで BLOB サービスによって返されるエラーが調べられ、ContainerNotFoundBlobAlreadyExists の 2 つの状態に対するフェールオーバー パスが提供されます。対象の BLOB コンテナーが見つからない場合、そのコンテナーが作成されます。このアクションは再試行対応のスコープ内で実行されて、信頼性を向上させ、一時的なエラーに対する回復力を強化します。2 番目のフェールオーバー パスは、同じ名前の BLOB が既に存在する状況に対処するためのものです。コードでは、上書き動作が有効な場合、既存の BLOB が削除されます。削除された後、新しい BLOB のアップロードが再試行されます。この操作は、信頼性を強化するため、再試行対応スコープ内で実行されます。

サイズの大きいメッセージを BLOB コンテナーに格納できるようになったので、次は、Azure Caching Service を利用する ICloudBlobStorage インターフェイスのもう 1 つの実装をデザインします。一貫性を保つため、そのコンストラクターから開始しましょう。

/// <summary>
/// Implements reliable generics-aware layer for Azure Caching Service.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Caching Service.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

前に説明した考慮事項における主要な技術デザインの決定事項の 1 つは、サイズの大きいメッセージの格納に BLOB サービスと Caching Service の両方を利用することでした。キャッシュ オプションは、8 MB の推奨ペイロードのサイズを超えない一時的なオブジェクトに最も適しています。BLOB オプションは、基本的に他のすべてのものを対象としています。全体的に見て、この決定によりハイブリッドのオーバーフロー ストアが必要になります。ハイブリッド ストアを作成するための基礎は既にコードベース内にあります。これは、既存のアーティファクトを次のように結合するだけです。

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Caching Service in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Caching Service.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that will ensure reliable access to the Caching Service.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

この時点で、ハイブリッド オーバーフロー ストアでの Put 操作の実装を示すもう 1 つのコード スニペットを含めて終わりにします。


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

前述したストレージ抽象化レイヤーをクライアント アプリケーションから使用する方法について例を示さないとこの記事は完了しないため、これらの例を、技術的な実装の検証も行うテスト アプリケーションと組み合わせて示します。

新たに実装されたストレージ抽象化レイヤーを通じてサイズの大きいメッセージが正しくやり取りされることを実証するために、ごく簡単なコンソール アプリケーションが作成されています。最初の手順では、サイズが 90 MB のサンプルの XML ドキュメントを取得し、Azure キューに格納します。2 番目の手順では、キューからメッセージを使用します。メッセージは、ファイル サイズや内容を比較できるように別の名前でディスクに書き戻される元の XML ドキュメントであることが必要です。これらの手順の間、アプリケーションは一時停止モードとなり、その間はキューの内容やそれぞれのメッセージ オーバーフロー ストア (キャッシュや BLOB コンテナーなど) を調べることができます。テスト アプリケーションのソース コードを以下に示します。

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

完全を期すために、テスト時に使用したアプリケーション構成ファイルを以下に示します。テスト アプリケーションを試す場合は、app.config のコピーを変更し、実際のストレージ アカウントの認証情報やキャッシュ サービスのエンドポイント情報を追加しているかどうかを確認してください。

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

テスト アプリケーションが正常にコンパイルおよび実行された場合は、次のような出力がコンソール ウィンドウに表示されます。

テスト アプリケーションで使用されたストレージ アカウントを参照すると、次のメッセージがキューに表示されます。

テスト メッセージは BLOB ストレージに直接オーバーフローできる大きさであったため、次のスクリーンショットはテスト アプリケーションが一時停止している間のそれぞれの BLOB コンテナー内の予想されるコンテンツを示しています。

テストに使用された元の 90 MB の XML ドキュメントがどのように 11 MB の BLOB になったかに注目してください。これは、XML バイナリのシリアル化を適用した結果ストレージと帯域幅が 87% 節約されたことを示しています。シナリオの対象クラスの場合、XML バイナリ シリアル化と圧縮は最初の最適な項目です。

テスト アプリケーションでキュー メッセージの削除を続行すると、メタデータ メッセージは次のスクリーンショットに表示されているようにメッセージ データを保持する BLOB と共に削除されることが予想されます。

この例には、サイズの大きいメッセージのライフサイクルが単純化された図で示されています。これは、BLOB ストアへのサイズの大きいメッセージのルーティング、透過的な圧縮、両方のメッセージ部分の自動削除など、ストレージ抽象化レイヤーの基本を強調するために用いられています。そろそろ最後のまとめに入ります。

これまで見てきたように、クライアントに技術的制約を追加することなく Azure Caching Service や Azure BLOB サービスを利用することで、Azure キューの使用を拡張し、64 KB を超えるメッセージをサポートできます。実際、若干の追加の作業を行うだけで、次のように満足度を向上させてクライアントのメッセージング環境を強化できます。

  • ストレージ コストを削減し、データセンターに対する入出力帯域幅を節約するための透過的なメッセージ圧縮

  • キャッシュ ストレージまたは BLOB ストレージへのサイズの大きいメッセージの透過的で、簡単にカスタマイズできるオーバーフロー

  • どのオブジェクトの種類でも簡単に格納可能にするためのジェネリックのサポート

  • 信頼性の向上のための一時的な状態の自動処理

以前に説明したように、このソリューションではオーバーフロー ストレージに分散キャッシュ ストアと BLOB ストアの両方を使用できるため、Azure Caching Service を使用する場合は追加のコストがかかります。キャッシュを使用したオーバーフローを有効にすることを決定する前に、プロジェクトのストレージ要件を慎重に評価し、予測されるメッセージの数とメッセージ サイズに基づくコスト分析を実行する必要があります。

このソリューションでは Azure キューで大きなメッセージを簡単にサポートできるようにしていますが、常に改善の余地も残されています。このソリューションに組み込まれていない、追加が望まれる付加価値機能の例を次に示します。

  • アプリケーション構成でサイズの大きいメッセージのオーバーフロー ストアの種類を構成する機能。

  • 既定のシリアライザーがパフォーマンスの目標や機能のニーズに対応していない場合 (既定の圧縮が必要ない場合など) の追加のカスタム シリアライザー。

  • BLOB ストレージをスキャンして、孤立した大きいメッセージ BLOB (ゾンビ) があるかどうかをすばやく検出できる階層リンクとして機能する BLOB のメタデータの項目。

  • オーバーフロー メッセージ ストアから孤立した BLOB をタイムリーに削除する "ガベージ コレクター" コンポーネント (ここで実装されているストレージ抽象化レイヤー以外のコンポーネントからもキューにアクセスする場合)。

付随するサンプル コードは、MSDN Code Gallery からダウンロードできます。対応する免責事項で説明されているとおり、すべてのソース コード ファイルは Microsoft Public License によって管理されることに注意してください。

この記事で説明しているトピックの詳細については、以下を参照してください。

表示:
© 2014 Microsoft