내보내기(0) 인쇄
모두 확장

비동기적으로 메시지 보내기

업데이트 날짜: 2014년 3월

Microsoft Azure 서비스 버스에서 클라이언트 쪽 일괄 처리 같은 고급 성능 기능을 사용하려면 .NET의 관리되는 조정된 메시징 API를 사용하여 메시징 솔루션을 구현할 때 항상 비동기 프로그래밍 모델을 사용하는 것이 좋습니다. 비동기 메시징 패턴을 사용하면 일반적으로 메시지를 주고받는 것과 같은 I/O 바운드 작업의 오버헤드를 피할 수 있는 솔루션을 빌드할 수 있습니다.

API 메서드를 비동기적으로 호출하는 경우 비동기 작업이 독립적으로 실행되는 동안 즉시 코드를 제어할 수 있게 되며 응용 프로그램이 계속 실행됩니다. 응용 프로그램은 비동기 작업을 모니터링하거나 작업이 완료되면 콜백을 통해 알림을 받습니다. 이때 응용 프로그램에서 결과를 가져와 처리할 수 있습니다.

QueueClient 클래스의 Send 메서드나 Receive 메서드, 또는 Service Bus 조정된 메시징 API에서 제공되는 다른 동기 메서드 같은 동기 작업을 호출할 경우 차단 방식이기는 하지만 내부적으로 API 코드가 해당 메서드의 비동기 버전을 통해 이동합니다. 하지만 이러한 메서드의 동기 버전을 사용하면 비동기 버전을 직접 호출할 때 기대할 수 있는 전반적인 성능 관련 혜택을 볼 수 없습니다. 이는 여러 메시지를 주고받고 있으며 해당 메시지 작업이 실행되는 동안 다른 처리를 수행하려는 경우에 특히 그렇습니다.

note참고
BrokeredMessage 개체는 메시지를 나타내며 네트워크를 통해 데이터를 전송하기 위한 목적으로 제공됩니다. 큐 또는 항목에 BrokeredMessage 개체를 보내는 즉시 이 개체는 기본 메시징 스택에서 사용되며 이후 작업에 다시 사용할 수 없습니다. 이는 메시지 본문을 일단 읽으면 메시지 데이터를 프로젝션하는 스트림을 되감을 수 없기 때문입니다. 메시징 작업의 성공을 안정적으로 보장할 수 있을 때까지 BrokeredMessage 인스턴스를 생성하는 데 사용되는 원본 데이터를 유지해야 합니다. 실패한 메시징 작업을 다시 시도해야 하는 경우 이러한 원본 데이터를 사용하여 새 BrokeredMessage 인스턴스를 생성해야 합니다.

다음 코드 조각에서는 메시지를 보내는 순서를 유지하면서 여러 메시지를 비동기적이고 안정적으로 보내는 방법을 설명합니다.

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

가능하면 TPL(Task Parallel Library)PLINQ(병렬 LINQ)에서 제공되는 기본 예약 및 작업 분할 알고리즘을 사용하여 메시징 작업을 병렬화하지 마십시오. TPL 프레임워크의 기본적인 사항은 컴퓨팅 바운드 작업 관점에서 대부분 응용 프로그램에 병렬 처리 및 동시성을 추가하는 데 가장 적합합니다. 네트워킹 호출 및 메시징 작업 같은 I/O 관련 코드의 성능 향상을 위해 TPL을 "있는 그대로" 사용하면 사용자가 기대하는 향상을 얻을 수 없습니다. TPL을 사용하여 비동기 작업을 지원하는 가장 좋은 방법은 비동기 프로그래밍 모델에 맞는 고급 TPL 패턴을 사용하는 것입니다.

표시:
© 2014 Microsoft