내보내기(0) 인쇄
모두 확장

비동기적으로 메시지 받기

업데이트 날짜: 2014년 3월

메시지를 비동기적으로 보낼 때와 마찬가지로 또한 실무적인 관점에서도, 비동기 프로그래밍 모델을 사용하여 Microsoft Azure 서비스 버스에서 메시지를 받을 수도 있습니다.

Service Bus 큐 또는 구독에서 새 메시지를 기다리는 동안 솔루션에서 종종 폴링 요청이 발생합니다. 다행히 Service Bus는 큐에 메시지가 도착하거나 지정된 제한 시간이 경과될 때까지(이 중 먼저 발생하는 시점까지) 서버에 대한 연결을 유지하는 긴 폴링 수신 작업을 제공합니다. 긴 폴링 수신이 동기적으로 수행되는 경우 새 메시지를 기다리는 동안 CLR 스레드 풀 스레드가 차단되며, 이는 최적의 상태로 간주되지 않습니다. 일반적으로 CLR 스레드 풀의 용량이 제한적이므로 특히 장기 실행 작업에 대해 스레드 풀을 사용하지 않도록 하는 것은 당연합니다.

Service Bus 조정된 메시징 API를 사용하여 실로 효과적인 메시징 솔루션을 빌드하려면 수신 작업을 항상 비동기적으로 수행해야 합니다. 솔루션에서 한 번에 하나의 메시지를 수신하는지 또는 여러 메시지를 가져오는지 여부에 관계없이 제한 시간이 지정된 상태로 BeginReceive 메서드를 사용하여 수신 작업을 시작합니다. 현재 API에서 최대 수신 제한 시간 값은 24입니다. Service Bus 메시징 클라이언트가 사용자 대신 새 메시지를 기다리는 동안 솔루션은 다른 작업을 계속 수행할 수 있습니다. 수신이 완료되면 콜백 메서드가 표시되며 수신된 메시지(있는 경우)를 처리할 수 있습니다.

note참고
큐 또는 구독에서 메시지가 수신되면 해당 본문을 한 번만 읽을 수 있습니다. 네트워크 프로토콜의 특성으로 인해 메시지 데이터 스트림은 종종 검색 작업을 지원하지 않으므로 "되감지" 못할 수도 있습니다. GetBody() 메서드를 호출한 후 메시지 데이터를 개체에 저장하여 보호한 다음 필요할 때까지 해당 개체를 유지해야 합니다. 조정된 메시징 API에서 GetBody() 메서드를 두 번 이상 호출할 수 없습니다.

아래 코드 샘플에서는 Service Bus 큐에서 지정된 수의 메시지를 비동기적으로 받는 프로그래밍 메서드의 예를 보여 줍니다.

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

이전 섹션의 권장 사항에 따라 Task Parallel Library에서 제공되는 비동기 프로그래밍 모델 통합을 사용하여 비동기 메시지 수신 작업을 병렬화하는 것이 가장 좋습니다.

Microsoft는 MSDN 웹 사이트에 대한 귀하의 의견을 이해하기 위해 온라인 설문 조사를 진행하고 있습니다. 참여하도록 선택하시면 MSDN 웹 사이트에서 나가실 때 온라인 설문 조사가 표시됩니다.

참여하시겠습니까?
표시:
© 2014 Microsoft