영업: 1-800-867-1380

Azure 큐로 큰 메시지를 처리하기 위한 모범 사례

업데이트 날짜: 2015년 3월

저자: Valery Mizonov

검토자: Brad Calder, Larry Franks, Jai Haridas, Sidney Higa, Christian Martinez, Curt Peterson 및 Mark Simms

이 문서에서는 Azure 큐 서비스에 대해 제네릭 인식 저장소 추상화 계층의 구현에 대한 개발자 중심의 지침을 제공합니다. Azure 큐에서 대용량 메시지를 지원하려면 현재 메시지 크기 제한을 해결해야 합니다. 이 문서와 관련 코드의 내용을 참조하면 큐에서 적용되는 64KB 제한으로 인한 메시지 크기 관리를 수행하지 않고도 Azure 큐를 사용할 수 있습니다.

640K라는 용량이 모든 작업에 충분했던 때도 있었습니다. 즉, 이전에는 숙련된 개발자가 모든 응용 프로그램 데이터를 저장할 수 있는 저장소 공간으로 몇 KB만 있으면 충분했습니다. 하지만 현재의 응용 프로그램이 교환할 수 있어야 하는 데이터의 양은 크게 달라질 수 있습니다. 아주 작은 HL7 메시지나 수 메가바이트에 달하는 EDI 문서든지 간에 현재의 응용 프로그램은 예측할 수 없는 속도로 진화하고 있는 모든 종류의 볼륨 특성을 해결할 수 있어야 합니다. 지난 세기 멀티바이트 구조로 표현되었던 비즈니스 개체는 이제 오늘날의 현대적인 직렬화, 표현 기술 및 형식 덕분에 이전 기술들보다 몇 배는 더 저장소 부족 문제가 심화되었습니다.

계속 변화하는 데이터 볼륨을 지원하기 위한 핵심은 메시지 크기에 대한 기술적 제한을 두지 않고 지정된 솔루션 아키텍처에서 메시지를 처리하는 것입니다. 큰 메시지 사용이 불가피한 경우가 있을 수 있습니다. 예를 들어, B2B 솔루션이 EDI 트래픽을 처리하도록 설계된 경우 이 솔루션은 최대 몇 MB에 달하는 EDI 문서를 수신할 수 있도록 준비해야 합니다. 종단 간 흐름에서 모든 계층, 서비스 및 구성 요소는 처리 중인 문서 크기를 수용할 수 있어야 합니다. 테스트를 진행하는 동안 웹 서비스를 통해 20MB EDI 846 Inventory Advice 문서를 수용했는데 큐의 메시지 크기 제한으로 인해 문서 처리를 위해 큐에 저장할 수 없다면 실망스러울 것입니다.

Azure 플랫폼에서 큰 메시지에 큐를 사용하는 이유는 무엇일까요? 달리 말해도 BLOB, 테이블, 클라우드 드라이브 또는 Microsoft Azure SQL 데이터베이스 등의 다른 대안은 무엇이 문제일까요? 큐는 확장성 및 안정성이 뛰어난 방식으로 수행되는 생산자와 소비자 간의 비동기적인 느슨하게 결합된 통신 특성을 갖는 특정 유형의 메시징 시나리오를 구현할 수 있게 해줍니다. Azure 큐를 사용하면 지정된 솔루션의 서로 다른 부분의 결합을 해제하고 FIFO(First In, First Out) 및 최소 한 번 이상 배달과 같은 고유한 의미 체계를 제공합니다. 이러한 의미 체계는 다른 대체 데이터 교환 메커니즘을 통해서는 구현하기가 다소 어려울 수 있습니다. 또한 큐는 영구적인 데이터 저장소가 아니라 서비스, 계층 및 응용 프로그램 간에 데이터를 교환하기 위한 임시 저장소로 사용하는 것이 가장 적합합니다. 각각의 데이터 교환 요구 사항은 비동기적인 방식으로 구성 요소 간에 메시지를 전달하거나 부하를 평준화하거나 복잡한 계산 작업을 확장하는 등 여러 가지 형식으로 나타날 수 있습니다. 이러한 많은 데이터 교환 패턴은 큐 없이 직관적으로 구현할 수 없는 것들입니다. 간단히 말해서 큐는 매우 중요한 기능입니다. 큐에 들어갈 수 있고 들어갈 수 없는 항목에 대해 걱정할 필요가 없다는 것은 어떠한 크기의 데이터라도 처리할 수 있는 통합된 큐 기반 메시징 솔루션을 구축하기 위한 강력한 성능입니다.

이 추상화 계층은 오늘날 큐 서비스 API에서 지원하는 유일한 유형인 바이트 배열 또는 문자열을 처리해야 하는 것과 반대로 응용 프로그램 특정 엔터티의 인스턴스를 보다 쉽게 게시 및 소비할 수 있게 해줍니다. 여기서는 .NET 제네릭을 폭넓게 활용하여 투명한 스트림 압축 및 압축 풀기와 같은 일부 부가 가치 기능은 물론 저장소 작업의 내결함성 향상을 위해 중간 오류 처리와 같은 몇 가지 알려진 모범 사례를 적용합니다.

현재의 상황에서 볼 때 64 KB(직렬화 및 인코딩된 후)보다 큰 메시지는 Azure 큐에 저장할 수 없습니다. 큐에 64 KB보다 큰 메시지를 배치하려고 시도하면 클라이언트 쪽 API가 예외를 반환합니다. 이 게시물을 작성하는 시점에 이 속성으로 반환되는 메시지 크기 제한은 65536입니다.

Important중요
메시지를 큐에 전송할 때는 Base64 인코딩이 적용됩니다. 인코딩된 페이로드는 항상 해당 원본 데이터보다 큽니다. Base64 인코딩을 사용하는 경우 평균적으로 오버헤드가 25% 추가됩니다. 따라서 64KB 크기 제한으로 인해 실제로는 페이로드가 48KB(64KB의 75%)보다 큰 메시지를 저장할 수 없게 됩니다.

이러한 제한은 단일 큐 항목에 대한 제한이지만 더 작은 청크로 분해할 수 없는 메시지와 같은 특정 메시지 유형도 이러한 제한에 영향을 받을 수 있습니다. 개발자의 관점에서 볼 때 특정 메시지가 큐에 수용될 수 있는지 여부는 생산성을 향상시키는 데 도움이 되지는 않습니다. 결국 목표는 응용 프로그램 데이터가 데이터 크기에 관계없이 생산자와 소비자 사이에 가장 효율적인 방식으로 전달될 수 있도록 해야 하는 것입니다. 한 쪽에서 Put(또는 Enqueue)을 호출하고 다른 한쪽에서 큐에 대해 Get(또는 Dequeue)를 호출하면 나머지는 이론상으로 자동으로 수행되어야 합니다.

Azure 큐에서 큰 메시지에 대한 지능적인 방식으로 메시지 크기 제한 문제를 해결하는 것이 이 문서에서 설명하는 기술적 과제의 핵심 전제입니다. 이를 위해서는 몇 가지 까다로운 작업이 필요합니다. 현재의 상용 소프트웨어 개발 환경에서는 추가 개발 작업에 대한 올바른 근거가 필요합니다. 여기에서는 다음과 같은 설계 목표와 함께 이러한 추가적인 투자에 대한 근거를 제시하고자 합니다.

  • 매우 큰 메시지 지원 - 메시지 크기와 관련하여 큐 서비스 API가 갖고 있는 제한 사항을 제거합니다.

  • 사용자 정의 제네릭 개체 지원 - Azure 큐에서 메시지를 게시 및 소비하는 경우.

  • 구성 가능한 메시지 저장소에 대한 투명한 오버플로 - BLOB 컨테이너, 분산 캐시 또는 기타 큰 메시지를 저장할 수 있는 저장소 유형.

  • 투명한 압축 - 큰 메시지에서 소비되는 저장소 공간의 양을 최소화하여 비용 효율성을 높입니다.

  • 신뢰성 향상 - 큐 작업을 수행할 때 모범 사례를 처리하는 임시 조건을 포괄적으로 사용.

크기가 제한된 큐에서 큰 메시지를 지원하기 위한 기초는 다음과 같은 패턴으로 구성됩니다. 첫째, 추가 작업을 수행하지 않아도 Azure 큐에서 특정 메시지를 수용할 수 있는지 확인합니다. 크기 제한을 위반하지 않고 큐에 메시지를 안전하게 저장할 수 있는지 여부를 확인하기 위해서는 다음과 같이 도우미 함수에 래핑한 공식을 사용합니다.

/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility
{
    /// <summary>
    /// Verifies whether or not the specified message size can be accommodated in an Azure queue.
    /// </summary>
    /// <param name="size">The message size value to be inspected.</param>
    /// <returns>True if the specified size can be accommodated in an Azure queue, otherwise false.</returns>
    public static bool IsAllowedQueueMessageSize(long size)
    {
        return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
    }
}

메시지 크기가 적용된 제한보다 작을 경우, 메시지를 "있는 그대로" 큐에 넣기 위해 큐 서비스 API를 호출합니다. 메시지 크기가 해당 한도를 초과할 경우에는 데이터 흐름이 흥미롭게 변합니다. 다음 순서도에서는 이후의 단계를 시각적으로 보여줍니다.

저장-메시지-Flow1

요약하자면, 메시지가 해당 크기로 인해 큐에 수용될 수 없는 경우 큰 메시지를 저장할 수 있는 메시지 저장소로 오버플로됩니다. 그런 후 오버플로 저장소에 있는 항목에 대한 참조로 구성된 작은 메타데이터 메시지가 만들어집니다. 마지막으로 메타데이터 메시지가 큐에 입력됩니다. 필자의 경우 항상 큐에서 지속성에 대한 적합성을 보장하기 위해 메시지를 압축하도록 선택합니다. 이렇게 하면 오버플로 저장소로 이동할 필요 없이 큐에 넣을 수 있는 메시지 채우기를 효과적으로 확장할 수 있습니다. 이에 대해서는 직렬화 및 압축을 수행한 후 큐에 넣을 수 있는 올바른 후보로 변환되는 64 KB를 약간 초과하는 XML 문서를 예로 들 수 있습니다. 기본 압축이 적합하지 않을 때에는 이 동작을 수정할 수 있습니다. 이 작업은 다음 섹션에서 설명하는 사용자 지정 직렬 변환기 구성 요소를 제공하여 수행할 수 있습니다.

여기에는 비용 관점에서 적용되는 몇 가지 고려할 사항이 있습니다. 위의 순서도에 명시될 수 있으므로 큰 메시지가 처음에 Azure 캐시로 오버플로될 수 있는지 여부를 판별하려고 합니다. 분산 클라우드 기반 Caching Service 사용에는 요금이 적용되기 때문에 캐시 오버플로 경로는 선택적으로 지정되어야 합니다. 이는 순서도에 반영되어 있습니다.

또한 메시지가 상당히 커서 크기가 제한된 분산 캐시에는 저장하기에 적합하지 않을 수도 있습니다. 이 문서를 작성할 당시의 최대 캐시 크기는 4GB입니다. 따라서 이를 고려해서 캐시 용량 또는 할당량을 초과할 경우 장애 조치(failover) 경로를 제공해야 합니다. 할당량에도 또한 고려해야 할 제거 동작이 있습니다.

Important중요
Azure 캐시를 오버플로 저장소로 사용하면 대량의 메시지를 교환할 때 대기 시간을 줄이고 과도한 저장소 트랜잭션을 제거하는 데 도움이 됩니다. 지속성을 위해 다중 캐시 서버 간에 캐시된 데이터를 복제하고 메모리에 유지 관리할 수 있는 가용성이 뛰어난 분산 캐싱 인프라를 제공합니다. 이러한 혜택은 캐시 크기 제한 및 서비스 사용과 관련된 비용에 의해 무시될 수 있습니다. 따라서 특정 시나리오에서 오버플로 저장소로 Azure 캐시를 도입하는 데 따른 장점과 단점을 평가하기 위한 비용 효율성 분석을 수행하는 것이 중요합니다.

note참고
응용 프로그램에 맞는 올바른 Azure 캐시 제품을 선택하는 데 대한 지침은 내게 맞는 Azure 캐시 제품을 참조하세요.

다음은 지정된 크기의 항목을 캐시에 저장할 때 지정된 항목 크기 값이 최적으로 고려될 수 있는지 여부를 판별할 도움말 기능입니다.

public static class CloudUtility
{
    private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;

    /// <summary>
    /// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
    /// </summary>
    /// <param name="size">The item size value to be inspected.</param>
    /// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
    public static bool IsOptimalCacheItemSize(long size)
    {
        return size >= 0 && size <= optimalCacheItemSize;
    }
}

몇 가지 초기에 필요한 필수 구성 요소를 다루었으므로, 이제는 소비자 쪽으로 전환해서 큐에서 큰 메시지를 검색하기 위한 구현 패턴을 살펴보겠습니다. 가장 먼저 전반적인 이해를 돕기 위해 프로세스 흐름을 시각화해보겠습니다.

큰-메시지-오버플로

위의 흐름을 요약하자면, 알 수 없는 유형의 메시지가 큐에서 인출되고 메타데이터 메시지 유형과 비교됩니다. 메타데이터 메시지가 아니면 흐름이 계속해서 압축 해제 논리로 진행되어, 원래 메시지를 소비자에게 제공하기 전에 올바르게 재구성할 수 있도록 합니다. 반대로, 실제로 메타데이터 메시지이면, 실제 메시지를 저장하는 데 사용된 오버플로 저장소 유형을 확인하기 위해 조사됩니다. 캐시에 저장된 메시지로 식별되고, 해당 Caching Service API가 호출되고 압축되어 소비자에게 반환되기 전에 실제 메시지가 인출됩니다. 실제 메시지가 BLOB 컨테이너에 입력된 경우, BLOB 서비스 API가 대상으로 지정되어 BLOB 엔터티에서 실제 메시지를 검색하고, 압축을 해제하고 다시 호출자에게 전달합니다.

큰 메시지에 대한 큐에 넣기큐에서 제거 작업의 처리뿐만 아니라 소비자의 요청에 따라 모든 오버플로된 페이로드가 해당 오버플로 메시지 저장소에서 제거되었는지 확인해야 합니다. 이를 수행하기 위해 한 가지 가능한 구현 패턴은 특정 메시지에 대해 호출되었을 때 제거 프로세스를 Delete 작업과 결합하는 것입니다. 이러한 작업의 시각적 표현은 다음과 같이 설명될 수 있습니다.

큰-메시지-오버플로

위에서 언급한 패턴을 구현하기 전에 마지막으로 고려해야 할 사항은 메시지에 대한 정의입니다. 무엇이 메시지이고, 메시지 자체를 나타내기 위한 형식은 무엇인가? 바이트 배열, 데이터 스트림, 문자열과 같은 간단한 형식 또는 개발자가 솔루션 개체 모델의 일환으로 구현하는 복잡한 응용 프로그램 관련 개체인가? 필자는 이 분야야 말로 실제로 우리 자신을 제한하지 않아야 하는 영역이라고 생각합니다. 메시지는 단순히 제네릭 형식의 <T> 즉, 개발자가 사용하고자 하는 모든 것이라고 가정하기 바랍니다. 최종 구현에서는 자연스럽게 이러한 개념을 이해할 수 있습니다.

이를 모두 합해서, 다음 다이어그램에는 위 설계에서 고려된 세 가지 가능한 이동 경로가 모두 요약되어 있습니다.

메시지-이동-경로

이 시점에서는 기술 설계를 실제로 구현하기 위한 입력으로서 충분할 것입니다. 이제부터는 위에서 설명한 패턴을 구현하는 데 필요한 원본 코드로 초점을 이동하고자 합니다.

본 문서의 내용을 따라하기 위해서는 MSDN 코드 갤러리에서 전체 샘플 코드를 다운로드하세요. 이 샘플은 이 문서에서 설명한 패턴으로 지원되는 보다 큰 종단 간 참조 구현의 일부로 제공됩니다. 다운로드하고 압축을 푼 다음에는 Contoso.Cloud.Integration 아래의 Azure.Services.Framework 프로젝트로 이동하고 Storage 폴더를 확장합니다. 여기에는 아래에서 설명하는 모든 기본 코드 아티팩트가 포함되어 있습니다.

처음에 설명한 것처럼 원래 개념은 클라우드 응용 프로그램이 Window Azure 큐와 상호 작용하는 방식을 추상화하는 것입니다. 이 요구 사항은 사용자 지정 저장소 추상화 계층으로 지원되는 기본 작업을 제어하는 계약을 제공하여 해결하고자 합니다. 계약이 소비자에게 노출하는 프로그래밍 인터페이스는 다음과 같습니다. 여기에서는 큐 생성 및 삭제와 같은 일부 인프라 수준의 함수를 코드 조각에서 일부러 생략했습니다. 이러한 함수는 이 문서의 이 시점에서 큰 의미를 갖지 않습니다.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Queue storage.
/// </summary>
public interface ICloudQueueStorage : IDisposable
{
    /// <summary>
    /// Puts a single message on a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The target queue name on which message will be placed.</param>
    /// <param name="message">The payload to be put into a queue.</param>
    void Put<T>(string queueName, T message);

    /// <summary>
    /// Retrieves a single message from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <returns>An instance of the object that has been fetched from the queue.</returns>
    T Get<T>(string queueName);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);

    /// <summary>
    /// Gets a collection of messages from the specified queue and applies the default visibility timeout.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="queueName">The name of the source queue.</param>
    /// <param name="count">The number of messages to retrieve.</param>
    /// <returns>The list of messages retrieved from the specified queue.</returns>
    IEnumerable<T> Get<T>(string queueName, int count);

    /// <summary>
    /// Deletes a single message from a queue.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the message.</typeparam>
    /// <param name="message">The message to be deleted from a queue.</param>
    /// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
    bool Delete<T>(T message);
}

또한 큰 메시지 오버플로 저장소에 대한 액세스를 추상화하는 한 가지 추가 계약(인터페이스)이 필요합니다. 두 구성 요소는 각 오버플로 저장소마다 하나씩(BLOB 저장소 및 분산 캐시) 계약을 구현합니다. 이 계약은 다음과 같은 작업들로 구성됩니다.

/// <summary>
/// Defines a generics-aware abstraction layer for Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable
{
    /// <summary>
    /// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob);

    /// <summary>
    /// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied. 
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name into which a blob will be stored.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <param name="blob">The blob's payload.</param>
    /// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
    /// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
    bool Put<T>(string containerName, string blobName, T blob, bool overwrite);

    /// <summary>
    /// Retrieves a blob by its name from the underlying storage.
    /// </summary>
    /// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
    /// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
    T Get<T>(string containerName, string blobName);

    /// <summary>
    /// Deletes the specified blob.
    /// </summary>
    /// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
    /// <param name="blobName">The custom name associated with the blob.</param>
    /// <returns>True if the blob was deleted, otherwise false.</returns>
    bool Delete(string containerName, string blobName);
}

두 계약 모두 제네릭 형식 <T>에 대한 의존도가 매우 높습니다. 메시지 형식을 원하는 .NET 유형에 맞게 맞춤화할 수 있습니다. 하지만 스트림과 같이 특별한 취급이 필요한 형식과 같은 일부 극단적인 사용 사례를 처리해야 합니다. 이에 대해서는 이후에 자세히 설명합니다.

선택한 메시지 유형에 관계없이 한 가지 중요한 요구 사항이 적용됩니다. 바로 큐에서 메시지를 나타내는 개체 형식을 직렬화해야 한다는 것입니다. 저장소 추상화 계층을 통과하는 모든 개체는 큐 또는 오버플로 저장소에 도착하기 전에 직렬화해야 합니다. 이 문서에서는 직렬화 및 역직렬화도 각각 압축 및 압축 해제와 결합되어 있습니다. 이 방식은 비용 및 대역폭 측면에서 효율성을 높여줍니다. 비용 관련 이점은 압축된 큰 메시지가 본질적으로 더 작은 저장소를 소비해서 저장소 비용을 줄여준다는 사실을 기반으로 합니다. 대역폭 효율성은 압축으로 인해 페이로드 크기를 줄일 수 있고, 따라서 Azure 저장소를 통과할 때 유선 상으로 전송되는 페이로드 크기를 줄여주기 때문에 발생합니다.

직렬화 및 역직렬화에 대한 요구 사항은 특수한 인터페이스로 선언됩니다. 이 인터페이스를 구현하는 모든 구성 요소는 특정 압축, 직렬화, 역직렬화 및 압축 해제 기능을 제공해야 합니다. 이러한 인터페이스의 예는 다음과 같습니다.

/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    void Serialize(object instance, Stream target);

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    object Deserialize(Stream source, Type type);
}

압축 및 압축 해제를 위해, 여기에서는 .NET Framework의 DeflateStream 구성 요소가 사용됩니다. 이 클래스는 비손실 파일 압축 및 압축 해제를 위한 산업 표준 RFC 호환 알고리즘인 Deflate 알고리즘을 나타냅니다. GZipStream 클래스와 달리 Deflate 알고리즘은 보다 최적으로 압축된 이미지를 제공하며 일반적으로 더 나은 성능을 제공합니다. 반대로 GZipStream 클래스는 데이터 손상 감지를 위해 CRC(순환 중복성 검사) 값을 포함하는 GZIP 데이터 형식을 사용합니다. 실질적으로 GZIP 데이터 형식은 DeflateStream 클래스와 동일한 압축 알고리즘을 사용합니다. 요약하자면, GZipStreamDeflateStream과 CRC 체크섬 계산 및 저장 비용을 합한 것과 같습니다.

이 문서에서의 계약 구현은 아래에 포함되어 있습니다. 압축 알고리즘은 DeflateStream 클래스를 GZipStream으로 바꾸거나 혹은 그 반대로 하여 쉽게 전환할 수 있습니다.

/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and 
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer
{
    /// <summary>
    /// Serializes the object to the specified stream.
    /// </summary>
    /// <param name="instance">The object instance to be serialized.</param>
    /// <param name="target">The destination stream into which the serialized object will be written.</param>
    public void Serialize(object instance, Stream target)
    {
        Guard.ArgumentNotNull(instance, "instance");
        Guard.ArgumentNotNull(target, "target");

        XDocument xmlDocument = null;
        XElement xmlElement = null;
        XmlDocument domDocument = null;
        XmlElement domElement = null;

        if ((xmlElement = (instance as XElement)) != null)
        {
            // Handle XML element serialization using separate technique.
            SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((xmlDocument = (instance as XDocument)) != null)
        {
            // Handle XML document serialization using separate technique.
            SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domDocument = (instance as XmlDocument)) != null)
        {
            // Handle XML DOM document serialization using separate technique.
            SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
        }
        else if ((domElement = (instance as XmlElement)) != null)
        {
            // Handle XML DOM element serialization using separate technique.
            SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
        }
        else
        {
            var serializer = GetXmlSerializer(instance.GetType());

            using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
            using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
            {
                serializer.WriteObject(xmlWriter, instance);
            }
        }
    }

    /// <summary>
    /// Deserializes the object from specified data stream.
    /// </summary>
    /// <param name="source">The source stream from which serialized object will be consumed.</param>
    /// <param name="type">The type of the object that will be deserialized.</param>
    /// <returns>The deserialized object instance.</returns>
    public object Deserialize(Stream source, Type type)
    {
        Guard.ArgumentNotNull(source, "source");
        Guard.ArgumentNotNull(type, "type");

        if (type == typeof(XElement))
        {
            // Handle XML element deserialization using separate technique.
            return DeserializeXml<XElement>(source, (reader) => { return XElement.Load(reader); });
        }
        else if (type == typeof(XDocument))
        {
            // Handle XML document deserialization using separate technique.
            return DeserializeXml<XDocument>(source, (reader) => { return XDocument.Load(reader); });
        }
        else if (type == typeof(XmlDocument))
        {
            // Handle XML DOM document deserialization using separate technique.
            return DeserializeXml<XmlDocument>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml; });
        }
        else if (type == typeof(XmlElement))
        {
            // Handle XML DOM element deserialization using separate technique.
            return DeserializeXml<XmlElement>(source, (reader) => { var xml = new XmlDocument(); xml.Load(reader); return xml.DocumentElement; });
        }
        else
        {
            var serializer = GetXmlSerializer(type);

            using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
            using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
            {
                return serializer.ReadObject(xmlReader);
            }
        }
    }

    private XmlObjectSerializer GetXmlSerializer(Type type)
    {
        if (FrameworkUtility.GetDeclarativeAttribute<DataContractAttribute>(type) != null)
        {
            return new DataContractSerializer(type);
        }
        else
        {
            return new NetDataContractSerializer();
        }
    }

    private void SerializeXml<T>(T instance, Stream target, Action<T, XmlWriter> serializeAction)
    {
        using (var compressedStream = new DeflateStream(target, CompressionMode.Compress, true))
        using (var xmlWriter = XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false))
        {
            serializeAction(instance, xmlWriter);

            xmlWriter.Flush();
            compressedStream.Flush();
        }
    }

    private T DeserializeXml<T>(Stream source, Func<XmlReader, T> deserializeAction)
    {
        using (var compressedStream = new DeflateStream(source, CompressionMode.Decompress, true))
        using (var xmlReader = XmlDictionaryReader.CreateBinaryReader(compressedStream, XmlDictionaryReaderQuotas.Max))
        {
            return deserializeAction(xmlReader);
        }
    }
}

CloudStorageEntitySerializer 구현에서 강력한 기능 중 하나는 XmlDocumentXDocument 방식 모두에서 XML 문서를 처리할 때 특별한 취급 방식을 적용할 수 있는 기능입니다. 주의 해야 하는 또 다른 영역은 XML 데이터에 대한 최적의 직렬화 및 역직렬화입니다. 여기에서는 .NET 이진 XML 형식을 사용하여 XML 페이로드의 효율적인 직렬화 및 역직렬화를 수행할 때 .NET 개발자들에게 있어서 뛰어난 선택으로 잘 알려진 XmlDictionaryReaderXmlDictionaryWriter 클래스가 사용되었습니다.

오버플로 메시지 저장소의 형식과 관련한 결정은 사용자 지정 저장소 추상화 계층으로 호출하는 소비자의 책임입니다. 여기에서는 ICloudQueueStorage 인터페이스를 구현하는 형식에 다음과 같은 생성자를 추가하여 원하는 메시지 저장소 형식을 선택할 수 있는 옵션을 제공하고자 합니다.

/// <summary>
/// Provides reliable generics-aware access to the Azure Queue storage.
/// </summary>
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudQueueClient queueStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly ICloudBlobStorage overflowStorage;
    private readonly ConcurrentDictionary<object, InflightMessageInfo> inflightMessages;
   
    /// <summary>
    /// Initializes a new instance of the <see cref="ReliableCloudQueueStorage"/> class using the specified storage account information,
    /// custom retry policy, custom implementation of <see cref="ICloudStorageEntitySerializer"/> interface and custom implementation of
    /// the large message overflow store.
    /// </summary>
    /// <param name="storageAccountInfo">The storage account that is projected through this component.</param>
    /// <param name="retryPolicy">The specific retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    /// <param name="overflowStorage">The component implementing overflow store that will be used for persisting large messages that
    /// cannot be accommodated in a queue due to message size constraints.</param>
    public ReliableCloudQueueStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer, ICloudBlobStorage overflowStorage)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");
        Guard.ArgumentNotNull(overflowStorage, "overflowStorage");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;
        this.overflowStorage = overflowStorage;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.queueStorage = storageAccount.CreateCloudQueueClient();

        // Configure the Queue storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.queueStorage.RetryPolicy = RetryPolicies.NoRetry();

        this.inflightMessages = new ConcurrentDictionary<object, InflightMessageInfo>(Environment.ProcessorCount * 4, InflightMessageQueueInitialCapacity);
    }
}

위의 생성자는 어떠한 복잡한 작업도 수행하지 않으며, 단지 내부 멤버만 초기화하고 Azure 큐에 액세스하게 될 클라이언트 구성 요소를 구성합니다. 하지만 큐 클라이언트가 어떠한 재시도 정책도 강제로 적용하지 않도록 명시적으로 지정할 필요는 전혀 없습니다. 강력하고 신뢰할 수 있는 저장소 추상화 계층을 제공하기 위해서는 Azure 큐에서 작업을 수행할 때 발생하는 일시적인 문제들을 보다 세부적으로 제어할 필요가 있습니다. 따라서 중간에 발생하는 매우 다양한 오류를 인식하고 처리할 수 있는 별도의 구성 요소를 준비해야 합니다.

이제 위에서 초안으로 작성한 ReliableCloudQueueStorage 클래스의 내부를 살펴보겠습니다. 특히 큰 메시지 저장소에 대한 투명한 오버플로 위치인 Put 작업의 구현을 검토해보겠습니다.

/// <summary>
/// Puts a single message on a queue.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
public void Put<T>(string queueName, T message)
{
    Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
    Guard.ArgumentNotNull(message, "message");

    // Obtain a reference to the queue by its name. The name will be validated against compliance with storage resource names.
    var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

    CloudQueueMessage queueMessage = null;

    // Allocate a memory buffer into which messages will be serialized prior to being put on a queue.
    using (MemoryStream dataStream = new MemoryStream(Convert.ToInt32(CloudQueueMessage.MaxMessageSize)))
    {
        // Perform serialization of the message data into the target memory buffer.
        this.dataSerializer.Serialize(message, dataStream);

        // Reset the position in the buffer as we will be reading its content from the beginning.
        dataStream.Seek(0, SeekOrigin.Begin);

        // First, determine whether the specified message can be accommodated on a queue.
        if (CloudUtility.IsAllowedQueueMessageSize(dataStream.Length))
        {
            queueMessage = new CloudQueueMessage(dataStream.ToArray());
        }
        else
        {
            // Create an instance of a large queue item metadata message.
            LargeQueueMessageInfo queueMsgInfo = LargeQueueMessageInfo.Create(queueName);

            // Persist the stream of data that represents a large message into the overflow message store.
            this.overflowStorage.Put<Stream>(queueMsgInfo.ContainerName, queueMsgInfo.BlobReference, dataStream);

            // Invoke the Put operation recursively to enqueue the metadata message.
            Put<LargeQueueMessageInfo>(queueName, queueMsgInfo);        
        }
    }
    // Check if a message is available to be put on a queue.
    if (queueMessage != null)
    {
        Put(queue, queueMessage);
    }
}

위의 코드 조각에서 그 자체로 나타낸 새로운 코드 아티팩트는 LargeQueueMessageInfo 클래스입니다. 이 사용자 지정 형식은 궁극적으로 큰 메시지의 위치를 기술하는 메타데이터 메시지입니다. 이 클래스는 내부적으로 표시되며, 저장소 추상화 계층 구현의 외부에 표시되지 않습니다. 클래스는 다음과 같이 정의됩니다.

/// <summary>
/// Implements an object holding metadata related to a large message which is stored in 
/// the overflow message store such as Azure blob container.
/// </summary>
[DataContract(Namespace = WellKnownNamespace.DataContracts.General)]
internal sealed class LargeQueueMessageInfo
{
    private const string ContainerNameFormat = "LargeMsgCache-{0}";

    /// <summary>
    /// Returns the name of the blob container holding the large message payload.
    /// </summary>
    [DataMember]
    public string ContainerName { get; private set; }

    /// <summary>
    /// Returns the unique reference to a blob holding the large message payload.
    /// </summary>
    [DataMember]
    public string BlobReference { get; private set; } 

    /// <summary>
    /// The default constructor is inaccessible, the object needs to be instantiated using its Create method.
    /// </summary>
    private LargeQueueMessageInfo() { }

    /// <summary>
    /// Creates a new instance of the large message metadata object and allocates a globally unique blob reference.
    /// </summary>
    /// <param name="queueName">The name of the Azure queue on which a reference to the large message will be stored.</param>
    /// <returns>The instance of the large message metadata object.</returns>
    public static LargeQueueMessageInfo Create(string queueName)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");

        return new LargeQueueMessageInfo() { ContainerName = String.Format(ContainerNameFormat, queueName), BlobReference = Guid.NewGuid().ToString("N") };
    }
}

이 문서를 계속 진행하려면 Azure Blob 저장소 서비스를 활용하는 큰 메시지 오버플로 저장소를 구현해야 합니다. 앞에서 언급한 것처럼 이 구성 요소는 메시지 크기 제한으로 인해 큐에 메시지를 수용할 수 없을 때마다 메시지를 ICloudBlobStorage 구현으로 릴레이하기 위해 ReliableCloudQueueStorage 구성 요소가 소비하는 ICloudBlobStorage 인터페이스를 지원해야 합니다. 다음 단계를 준비하기 위해 여기에서는 생성자 구현만 포함되었습니다.

/// <summary>
/// Implements reliable generics-aware layer for Azure Blob storage.
/// </summary>
public class ReliableCloudBlobStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly CloudBlobClient blobStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudBlobStorage class using the specified storage account info, custom retry
    /// policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to the underlying storage.</param>
    /// <param name="dataSerializer">The component which performs serialization/deserialization of storage objects.</param>
    public ReliableCloudBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        CloudStorageAccount storageAccount = new CloudStorageAccount(new StorageCredentialsAccountAndKey(storageAccountInfo.AccountName, storageAccountInfo.AccountKey), true);
        this.blobStorage = storageAccount.CreateCloudBlobClient();

        // Configure the Blob storage not to enforce any retry policies since this is something that we will be dealing ourselves.
        this.blobStorage.RetryPolicy = RetryPolicies.NoRetry();

        // Disable parallelism in blob upload operations to reduce the impact of multiple concurrent threads on parallel upload feature.
        this.blobStorage.ParallelOperationThreadCount = 1;
    }
}

이 문서 앞부분에서는 큰 메시지가 오버플로 저장소로 투명하게 라우팅되고 작은 메시지는 항상 큐에 배치되도록 보장하는 Put 작업의 구현을 살펴봤습니다. 이에 대한 연속으로 이제는 오버플로 저장소에서 구현되는 반대의 Put 작업 이면의 방식을 검토할 차례입니다.

/// <summary>
/// Puts a blob into the underlying storage, overwrites the existing blob if the blob with the same name already exists.
/// </summary>
private bool Put<T>(string containerName, string blobName, T blob, bool overwrite, string expectedEtag, out string actualEtag)
{
    Guard.ArgumentNotNullOrEmptyString(containerName, "containerName");
    Guard.ArgumentNotNullOrEmptyString(blobName, "blobName");
    Guard.ArgumentNotNull(blob, "blob");

    var callToken = TraceManager.CloudStorageComponent.TraceIn(containerName, blobName, overwrite, expectedEtag);

    // Verify whether or not the specified blob is already of type Stream.
    Stream blobStream = IsStreamType(blob.GetType()) ? blob as Stream : null;
    Stream blobData = null;
    actualEtag = null;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (blobStream != null)
        {
            blobData = blobStream;
        }
        else
        {
            // The specified blob is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();
            this.dataSerializer.Serialize(blob, blobData);
        }

        var container = this.blobStorage.GetContainerReference(CloudUtility.GetSafeContainerName(containerName));
        StorageErrorCode lastErrorCode = StorageErrorCode.None;

        Func<string> uploadAction = () =>
        {
            var cloudBlob = container.GetBlobReference(blobName);
            return UploadBlob(cloudBlob, blobData, overwrite, expectedEtag);
        };

        try
        {
            // First attempt - perform upload and let the UploadBlob method handle any retry conditions.
            string eTag = uploadAction();

            if (!String.IsNullOrEmpty(eTag))
            {
                actualEtag = eTag;
                return true;
            }
        }
        catch (StorageClientException ex)
        {
            lastErrorCode = ex.ErrorCode;

            if (!(lastErrorCode == StorageErrorCode.ContainerNotFound || lastErrorCode == StorageErrorCode.ResourceNotFound || lastErrorCode == StorageErrorCode.BlobAlreadyExists))
            {
                // Anything other than "not found" or "already exists" conditions will be considered as a runtime error.
                throw;
            }
        }

        if (lastErrorCode == StorageErrorCode.ContainerNotFound)
        {
            // Failover action #1: create the target container and try again. This time, use a retry policy to wrap calls to the
            // UploadBlob method.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                CreateContainer(containerName);
                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }

        if (lastErrorCode == StorageErrorCode.BlobAlreadyExists && overwrite)
        {
            // Failover action #2: Overwrite was requested but BlobAlreadyExists has still been returned.
            // Delete the original blob and try to upload again.
            string eTag = this.retryPolicy.ExecuteAction<string>(() =>
            {
                var cloudBlob = container.GetBlobReference(blobName);
                cloudBlob.DeleteIfExists();

                return uploadAction();
            });

            return !String.IsNullOrEmpty(actualEtag = eTag);
        }
    }
    finally
    {
        // Only dispose the blob data stream if it was newly created.
        if (blobData != null && null == blobStream)
        {
            blobData.Dispose();
        }

        TraceManager.CloudStorageComponent.TraceOut(callToken, actualEtag);
    }

    return false;
}

요약하자면, 위 코드는 BLOB 형식의 <T>를 가져와서 먼저 이미 Stream 개체 형식으로 직렬화된 메시지의 이미지인지 확인합니다. ReliableCloudQueueStorage 구성 요소에 의해 오버플로 저장소로 릴레이되는 모든 큰 메시지는 지속성을 위해 준비된 스트림으로 도착합니다. 대상 BLOB 컨테이너를 찾을 수 없는 경우에는 코드가 누락된 컨테이너를 만들려고 시도합니다. 이 작업은 신뢰성을 높이고 일시적 오류에 대한 탄력성을 높이기 위해 재시도 인식 범위 내에서 수행됩니다. 두 번째 장애 조치 경로는 동일한 이름의 BLOB가 이미 존재하는 상황을 처리하기 위한 것입니다. 이 코드는 덮어쓰기 동작이 설정된 경우 기존 BLOB를 제거합니다. 제거 후에는 새로운 BLOB의 업로드가 재시도됩니다. 다시 말하지만, 이 작업은 신뢰성 향상을 위해 재시도 인식 범위 내에서 수행됩니다.

큰 메시지를 BLOB 컨테이너에 저장할 수 있으므로, 이제는 Azure 캐시를 사용하는 또 다른 ICloudBlobStorage 인터페이스 구현을 설계할 차례입니다. 일관성을 위해 해당 생성자부터 시작하겠습니다.

/// <summary>
/// Implements reliable generics-aware layer for Azure Cache.
/// </summary>
public class ReliableCloudCacheStorage : ICloudBlobStorage
{
    private readonly RetryPolicy retryPolicy;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly DataCacheFactory cacheFactory;
    private readonly DataCache cache;

    /// <summary>
    /// Initializes a new instance of the ReliableCloudCacheStorage class using the specified storage account information
    /// custom retry policy and custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="endpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="retryPolicy">The custom retry policy that will ensure reliable access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs custom serialization and deserialization of cache items.</param>
    public ReliableCloudCacheStorage(CachingServiceEndpointInfo endpointInfo, RetryPolicy retryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(endpointInfo, "endpointInfo");
        Guard.ArgumentNotNull(retryPolicy, "retryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.retryPolicy = retryPolicy;
        this.dataSerializer = dataSerializer;

        var cacheServers = new List<DataCacheServerEndpoint>(1);
        cacheServers.Add(new DataCacheServerEndpoint(endpointInfo.ServiceHostName, endpointInfo.CachePort));

        var cacheConfig = new DataCacheFactoryConfiguration()
        {
            Servers = cacheServers,
            MaxConnectionsToServer = 1,
            IsCompressionEnabled = false,
            SecurityProperties = new DataCacheSecurity(endpointInfo.SecureAuthenticationToken, endpointInfo.SslEnabled),
            // The ReceiveTimeout value has been modified as per recommendations provided in
            // http://blogs.msdn.com/b/akshar/archive/2011/05/01/azure-appfabric-caching-errorcode-lt-errca0017-gt-substatus-lt-es0006-gt-what-to-do.aspx
            TransportProperties = new DataCacheTransportProperties() { ReceiveTimeout = TimeSpan.FromSeconds(45) }
        };

        this.cacheFactory = new DataCacheFactory(cacheConfig);
        this.cache = this.retryPolicy.ExecuteAction<DataCache>(() =>
        {
            return this.cacheFactory.GetDefaultCache();
        });
    }
}

앞에서 설명한 고려 사항을 떠올려 볼 때 중요한 기술적 설계 결정 사항 중 하나는 큰 메시지 저장을 위해 Blob 서비스 및 Azure 캐시를 모두 활용할 수 있는 기능입니다. 캐시 옵션은 권장 페이로드 크기인 8MB를 초과하지 않는 일시적인 개체에 대해 대부분 적합합니다. BLOB 옵션은 기본적으로 다른 모든 상황에 적합합니다. 전반적으로 볼 때 이러한 결정을 위해서는 하이브리드 오버플로 저장소가 필요합니다. 하이브리드 저장소를 구축하기 위한 기초는 이미 코드 베이스에 마련되어 있습니다. 단지 다음과 같이 기존의 아티팩트를 서로 합치기만 하면 됩니다.

/// <summary>
/// Implements reliable generics-aware storage layer combining Azure Blob storage and
/// Azure Cache in a hybrid mode.
/// </summary>
public class ReliableHybridBlobStorage : ICloudBlobStorage
{
    private readonly ICloudBlobStorage blobStorage;
    private readonly ICloudBlobStorage cacheStorage;
    private readonly ICloudStorageEntitySerializer dataSerializer;
    private readonly IList<ICloudBlobStorage> storageList;

    /// <summary>
    /// Initializes a new instance of the ReliableHybridBlobStorage class using the specified storage account information, caching
    /// service endpoint, custom retry policies and a custom implementation of ICloudStorageEntitySerializer interface.
    /// </summary>
    /// <param name="storageAccountInfo">The access credentials for Azure storage account.</param>
    /// <param name="storageRetryPolicy">The custom retry policy that will ensure reliable access to the underlying blob storage.</param>
    /// <param name="cacheEndpointInfo">The endpoint details for Azure Cache.</param>
    /// <param name="cacheRetryPolicy">The custom retry policy that provides access to Azure Cache.</param>
    /// <param name="dataSerializer">The component which performs serialization and deserialization of storage objects.</param>
    public ReliableHybridBlobStorage(StorageAccountInfo storageAccountInfo, RetryPolicy storageRetryPolicy, CachingServiceEndpointInfo cacheEndpointInfo, RetryPolicy cacheRetryPolicy, ICloudStorageEntitySerializer dataSerializer)
    {
        Guard.ArgumentNotNull(storageAccountInfo, "storageAccountInfo");
        Guard.ArgumentNotNull(storageRetryPolicy, "storageRetryPolicy");
        Guard.ArgumentNotNull(cacheEndpointInfo, "cacheEndpointInfo");
        Guard.ArgumentNotNull(cacheRetryPolicy, "cacheRetryPolicy");
        Guard.ArgumentNotNull(dataSerializer, "dataSerializer");

        this.dataSerializer = dataSerializer;
        this.storageList = new List<ICloudBlobStorage>(2);

        this.storageList.Add(this.cacheStorage = new ReliableCloudCacheStorage(cacheEndpointInfo, cacheRetryPolicy, dataSerializer));
        this.storageList.Add(this.blobStorage = new ReliableCloudBlobStorage(storageAccountInfo, storageRetryPolicy, dataSerializer));
    }
}

이제 하이브리드 오버플로 저장소에 Put 작업의 구현을 보여주는 하나 이상의 코드 조각을 포함하여 이 설명을 마치고자 합니다.


/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be customized. 
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
public bool Put<T>(string containerName, string blobName, T blob, bool overwrite)
{
    Guard.ArgumentNotNull(blob, "blob");

    bool success = false;
    Stream blobData = null;
    bool treatBlobAsStream = false;

    try
    {
        // Are we dealing with a stream already? If yes, just use it as is.
        if (IsStreamType(blob.GetType()))
        {
            blobData = blob as Stream;
            treatBlobAsStream = true;
        }
        else
        {
            // The specified item type is something else rather than a Stream, we perform serialization of T into a new stream instance.
            blobData = new MemoryStream();

            this.dataSerializer.Serialize(blob, blobData);
            blobData.Seek(0, SeekOrigin.Begin);
        }

        try
        {
            // First, make an attempt to store the blob in the distributed cache.
            // Only use cache if blob size is optimal for this type of storage.
            if (CloudUtility.IsOptimalCacheItemSize(blobData.Length))
            {
                success = this.cacheStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
        finally
        {
            if (!success)
            {
                // The cache option was unsuccessful, fail over to the blob storage as per design decision.
                success = this.blobStorage.Put<Stream>(containerName, blobName, blobData, overwrite);
            }
        }
    }
    finally
    {
        if (!treatBlobAsStream && blobData != null)
        {
            // Only dispose the blob data stream if it was newly created.
            blobData.Dispose();
        }
    }

    return success;
}

위에서 설명한 저장소 추상화 계층을 클라이언트 응용 프로그램에서 소비할 수 있는 방법에 대한 몇 가지 예를 제공하지 않았다면 이 문서가 불완전한 문서로 고려될 것입니다. 여기에서는 이러한 예를 기술적 구현을 검증하는 테스트 응용 프로그램과 결합해서 보여주고자 합니다.

새롭게 구현된 저장소 추상화 계층에서 큰 메시지를 성공적으로 전달할 수 있다는 것을 입증하기 위해 이 문서에는 매우 간단한 콘솔 응용 프로그램이 포함되었습니다. 첫 번째 단계에서는 90MB 크기의 샘플 XML 문서를 가져와서 이를 Azure 큐에 넣습니다. 두 번째 단계에서는 큐에서 메시지를 소비합니다. 메시지는 실제로 파일 크기 및 해당 콘텐츠를 비교할 수 있도록 다른 이름으로 디스크에 기록된 원본 XML 문서입니다. 이러한 단계를 수행하는 중에 응용 프로그램은 큐의 내용 및 캐시 또는 BLOB 컨테이너와 같은 각 메시지 오버플로 저장소를 탐색할 수 있도록 일시 정지 모드로 전환됩니다. 테스트 응용 프로그램의 원본 코드는 아래에 제공되어 있습니다.

using System;
using System.IO;
using System.Configuration;
using System.Xml.Linq;

using Contoso.Cloud.Integration.Framework;
using Contoso.Cloud.Integration.Framework.Configuration;
using Contoso.Cloud.Integration.Azure.Services.Framework.Storage;

namespace LargeQueueMessageTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Check if command line arguments were in fact supplied.
            if (null == args || args.Length == 0) return;

            // Read storage account and caching configuration sections.
            var cacheServiceSettings = ConfigurationManager.GetSection("CachingServiceConfiguration") as CachingServiceConfigurationSettings;
            var storageAccountSettings = ConfigurationManager.GetSection("StorageAccountConfiguration") as StorageAccountConfigurationSettings;

            // Retrieve cache endpoint and specific storage account definitions.
            var cacheServiceEndpoint = cacheServiceSettings.Endpoints.Get(cacheServiceSettings.DefaultEndpoint);
            var queueStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultQueueStorage);
            var blobStorageAccount = storageAccountSettings.Accounts.Get(storageAccountSettings.DefaultBlobStorage);

            PrintInfo("Using storage account definition: {0}", queueStorageAccount.AccountName);
            PrintInfo("Using caching service endpoint name: {0}", cacheServiceEndpoint.Name);

            string fileName = args[0], queueName = "LargeMessageQueue";
            string newFileName = String.Format("{0}_Copy{1}", Path.GetFileNameWithoutExtension(fileName), Path.GetExtension(fileName));

            long fileSize = -1, newFileSize = -1;

            try
            {
                // Load the specified file into XML DOM.
                XDocument largeXmlDoc = XDocument.Load(fileName);

                // Instantiate the large message overflow store and use it to instantiate a queue storage abstraction component.
                using (var overflowStorage = new ReliableHybridBlobStorage(blobStorageAccount, cacheServiceEndpoint))
                using (var queueStorage = new ReliableCloudQueueStorage(queueStorageAccount, overflowStorage))
                {
                    PrintInfo("\nAttempting to store a message of {0} bytes in size on an Azure queue", fileSize = (new FileInfo(fileName)).Length);

                    // Enqueue the XML document. The document's size doesn't really matter any more.
                    queueStorage.Put<XDocument>(queueName, largeXmlDoc);

                    PrintSuccess("The message has been succcessfully placed into a queue.");
                    PrintWaitMsg("\nYou can now inspect the content of the {0} queue and respective blob container...", queueName);

                    // Dequeue a message from the queue which is expected to be our original XML document.
                    XDocument docFromQueue = queueStorage.Get<XDocument>(queueName);

                    // Save it under a new name.
                    docFromQueue.Save(newFileName);

                    // Delete the message. Should remove the metadata message from the queue as well as blob holding the message data.
                    queueStorage.Delete<XDocument>(docFromQueue);

                    PrintInfo("\nThe message retrieved from the queue is {0} bytes in size.", newFileSize = (new FileInfo(newFileName)).Length);

                    // Perform very basic file size-based comparison. In the reality, we should have checked the document structurally.
                    if (fileSize > 0 && newFileSize > 0 && fileSize == newFileSize)
                    {
                        PrintSuccess("Test passed. This is expected behavior in any code written by CAT.");
                    }
                    else
                    {
                        PrintError("Test failed. This should have never happened in the code written by CAT.");
                    }
                }
            }
            catch (Exception ex)
            {
                PrintError("ERROR: {0}", ExceptionTextFormatter.Format(ex));
            }
            finally
            {
                Console.ReadLine();
            }
        }

        private static void PrintInfo(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.White;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintSuccess(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintError(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
        }

        private static void PrintWaitMsg(string format, params object[] parameters)
        {
            Console.ForegroundColor = ConsoleColor.Gray;
            Console.WriteLine(format, parameters);
            Console.ResetColor();
            Console.ReadLine();
        }
    }
}

완성도를 위해 아래에는 테스트 중 사용된 응용 프로그램 구성 파일이 제공되어 있습니다. 테스트 응용 프로그램을 사용해보려면 app.config의 사본을 수정하고 실제 저장소 계정 자격 증명과 Caching Service 끝점 정보를 추가하세요.

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="CachingServiceConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.CachingServiceConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
    <section name="StorageAccountConfiguration" type="Contoso.Cloud.Integration.Framework.Configuration.StorageAccountConfigurationSettings, Contoso.Cloud.Integration.Framework, Version=1.0.0.0, Culture=neutral, PublicKeyToken=23eafc3765008062"/>
  </configSections>

  <CachingServiceConfiguration defaultEndpoint="YOUR-CACHE-NAMESPACE-GOES-HERE">
    <add name="YOUR-CACHE-NAMESPACE-GOES-HERE" authToken="YOUR-CACHE-SECURITYTOKEN-GOES-HERE"/>
  </CachingServiceConfiguration>

  <StorageAccountConfiguration defaultBlobStorage="My Azure Storage" defaultQueueStorage="My Azure Storage">
    <add name="My Azure Storage" accountName="YOUR-STORAGE-ACCOUNT-NAME-GOES-HERE" accountKey="YOUR-STORAGE-ACCOUNT-KEY-GOES-HERE"/>
  </StorageAccountConfiguration>
</configuration>

테스트 응용 프로그램이 성공적으로 컴파일되고 실행된다고 가정할 때 콘솔 창에 표시되는 결과는 다음과 비슷하게 나타납니다.

콘솔-창-출력

테스트 응용 프로그램에서 사용되는 저장소 계정을 살펴보면 다음과 같은 메시지가 큐에 표시됩니다.

직렬화된-메타데이터-메시지

테스트 메시지는 BLOB 저장소로 직접 오버플로될 정도로 충분히 크기 때문에 다음 스크린샷에서는 테스트 응용 프로그램이 일시 정지되었을 때 해당 BLOB 컨테이너 내부에 표시되는 예상 콘텐츠를 보여줍니다.

Blob-컨테이너

이 테스트에 사용된 원래 90MB의 XML 문서가 어떻게 11MB의 BLOB로 전환되었는지 확인하시기 바랍니다. 여기에는 XML 이진 직렬화를 적용한 결과인 저장소 및 대역폭에서의 87% 절감이 반영되어 있습니다. 시나리오의 대상 클래스를 감안할 때 XML 이진 직렬화와 압축은 최초이자 최상의 선택입니다.

큐 메시지를 삭제하여 테스트 응용 프로그램을 계속 실행하면 메타데이터 메시지가 아래 스크린샷에 표시된 것처럼 메시지 데이터를 포함하는 BLOB와 함께 제거될 것으로 예상됩니다.

Blob-제거됨

위 예제는 큰 메시지의 수명 주기를 가장 단순한 형태로 보여줍니다. 이 예제는 BLOB 저장소로의 큰 메시지 라우팅, 투명한 압축, 두 메시지 파트 모두에 대한 자동 제거 등 저장소 추상화 계층의 기초를 집중적으로 보여주기 위한 것입니다. 이제 결론을 내릴 때가 되었습니다.

지금까지 살펴본 것처럼 Azure 큐는 클라이언트에 추가적인 기술적 제한을 부과하지 않고도 Azure 캐시 및 Azure Blob 서비스를 사용하여 64KB 이상의 메시지를 지원하도록 사용 영역을 확장할 수 있습니다. 실제로, 여기에서는 약간의 추가 노력만으로 다음과 같이 수명 품질 향상을 제공하여 클라이언트에 대한 메시징 환경을 강화할 수 있다는 것을 보여주었습니다.

  • 투명한 메시지 압축 - 데이터 센터에 대한 저장소 비용 및 대역폭을 줄입니다.

  • 투명하고 쉽게 사용자 지정할 수 있는 오버플로 - 캐시 및 BLOB 저장소에 큰 메시지를 오버플로합니다.

  • 제네릭 지원 - 모든 개체 형식을 쉽게 저장할 수 있습니다.

  • 일시적 조건에 대한 자동 처리 - 신뢰성을 향상시켜 줍니다.

앞에서 언급한 것처럼 이 솔루션은 분산 캐시 및 BLOB 저장소를 모두 오버플로 저장소에 사용할 수 있지만 Azure 캐시를 사용하기 위해서는 추가적인 비용이 발생합니다. 프로젝트의 저장소 요구 사항을 신중하게 평가하고 캐시를 사용한 오버플로 지원을 결정하기 전에 예상된 메시지 수 및 메시지 크기에 따라 비용 분석을 수행해야 합니다.

이 솔루션은 Azure 큐에서 큰 메시지를 지원하기 위한 간단한 방법들을 제공하지만 항상 개선의 여지가 있습니다. 이 솔루션에 포함되지 않았지만 추가해볼 수 있는 일부 부가 가치 기능은 다음과 같습니다.

  • 응용 프로그램 구성에서 큰 메시지의 오버플로 저장소 형식을 구성하는 기능.

  • 기본 직렬 변환기가 성능 목적 또는 기능 요구 사항을 충족하지 않을 경우에 대비한 추가적인 사용자 지정 직렬 변환기(예: 기본 압축이 필요하지 않은 경우).

  • BLOB 메타데이터에서 사용자가 BLOB 저장소를 빠르게 스캔하고 고립된 큰 메시지 BLOB(좀비)가 있는지 신속하게 찾을 수 있게 해주는 이동 경로로 작동하는 항목.

  • 오버플로 메시지 저장소에서 고립된 BLOB을 시간 내에 제거할 수 있도록 보장하는 가비지 수집기 구성 요소(저장소 추상화 계층 구현 이외의 구성 요소도 큐에 액세스하는 경우).

포함된 예제 코드는 MSDN 코드 갤러리에서 다운로드할 수 있습니다. 모든 원본 코드 파일은 해당 법적 고지 사항에 설명된 것처럼 Microsoft Public License에 의해 제어됩니다.

이 문서에서 설명된 항목에 대해 자세한 내용을 보려면 다음 항목을 참조하세요.

이 정보가 도움이 되었습니까?
(1500자 남음)
의견을 주셔서 감사합니다.
표시:
© 2015 Microsoft