Продажи: 1-800-867-1389

Асинхронное получение сообщений

Обновлено: Март 2014 г.

По аналогии с асинхронной отправкой сообщений, а также с практической точки зрения, можно расширить использование асинхронной модели программирования для получения сообщений из Шина службы Microsoft Azure.

При ожидании новых сообщений в очереди или подписке Служебная шина решение часто отправляет запросы опроса. К счастью, Служебная шина предоставляет операцию получения с продолжительным опросом, которая сохраняет подключение к серверу, пока не произойдет одно из следующих событий: сообщение будет доставлено в очередь или истечет заданное время ожидания. Если получение с продолжительным опросом выполняется синхронно, то на период ожидания нового сообщения поток пула потоков CLR блокируется, что не считается оптимальным. Емкость пула потоков CLR обычно ограничена, что дает веские причины не использовать пул потоков для особенно продолжительных операций.

Чтобы создать действительно эффективное решение для обмена сообщениями с помощью API обмена сообщениями через посредник Служебная шина, операцию получения всегда следует выполнять асинхронно. Если решение получает одно сообщение за раз или извлекает сразу несколько сообщений, операцию получения следует начинать методом BeginReceive и не указывать время ожидания. В текущем API максимальное время ожидания получения составляет 24 дня. Пока клиент обмена сообщениями Служебная шина от вашего имени ожидает новое сообщение, решение может продолжать выполнение каких-либо других задач. По завершении метод обратного вызова получит уведомление, и полученное сообщение (при его наличии) станет доступно для обработки.

noteПримечание
После получения сообщения из очереди или подписки его текст может быть прочитан только один раз. Из-за особенностей сетевых протоколов потоки данных сообщений не всегда можно "перематывать назад", так как в них не часто поддерживается операция поиска. Данные сообщения необходимо защитить, поместив их в объект после вызова метода GetBody(), и затем хранить этот объект, сколько потребуется. В API обмена сообщениями через посредник не поддерживается многократный вызов метода GetBody().

В примере кода внизу приведен вариант программного метода, который асинхронно получает заданное количество сообщений из очереди Служебная шина.

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

Согласно рекомендации в предыдущем разделе, для распараллеливания асинхронных операций получения сообщений лучше всего использовать интегрированную асинхронную модель программирования из библиотеки Task Parallel Library.

Была ли вам полезна эта информация?
(1500 символов осталось)
Спасибо за ваш отзыв
Корпорация Майкрософт проводит интернет-опрос, чтобы выяснить ваше мнение о веб-сайте MSDN. Если вы желаете принять участие в этом интернет-опросе, он будет отображен при закрытии веб-сайта MSDN.

Вы хотите принять участие?
Показ:
© 2014 Microsoft