Экспорт (0) Печать
Развернуть все

Асинхронная отправка сообщений

Обновлено: Март 2014 г.

Чтобы использовать преимущества расширенных возможностей для производительности в Шина службы Microsoft Azure, например пакетную обработку на стороне клиента, рекомендуется всегда применять асинхронную модель программирования для реализации решения для обмена сообщениями с помощью управляемого API .NET обмена сообщениями через посредник. Схема асинхронного обмена сообщениями позволит создавать решения, которые обычно не будут подвержены дополнительным временным затратам при операциях ввода-вывода, например при отправке и получении сообщений.

При асинхронном вызове метода API управление мгновенно возвращается к коду, и приложение продолжает выполнение, пока асинхронная операция выполняется независимо то него. Приложение либо наблюдает за асинхронной операцией, либо посредством обратного вызова получает уведомление о ее завершении. На этом этапе приложение может получить и обработать результаты.

Важно отметить, что при вызове синхронной операции, например метода Send или Receive в классе QueueClient (либо других синхронных методов, предоставленных API обмена сообщениями через посредник Служебная шина), внутреннее выполнение кода API осуществляется с использованием асинхронных версий соответствующих методов, хотя и с применением блокирования. Тем не менее, применение синхронных версий этих методов может не обеспечить полный спектр преимуществ для производительности, доступных при прямом вызове асинхронных методов Это особенно ощутимо при отправке или получении нескольких сообщений, если во время выполнения этих операций необходимо выполнить другие операции обработки.

noteПримечание
Объект BrokeredMessage представляет сообщение. Он предоставляется для проводной передачи данных. Так как объект BrokeredMessage отправляется в очередь или раздел, он используется базовым стеком сообщений, и не может быть использован повторно для дальнейших операций. Причина этого состоит в том, что после прочтения текста сообщения нельзя перемотать назад поток, передающий это сообщение. Необходимо хранить исходные данные, использованные для создания экземпляра BrokeredMessage, пока нельзя будет надежно проверить успешность выполнения операции обмена сообщениями. Если после сбоя операции обмена сообщениями требуется повторная попытка, следует создать новый экземпляр BrokeredMessage с использованием этих исходных данных.

В приведенном ниже фрагменте кода показано, как асинхронно (и надежным способом) отправить несколько сообщений с сохранением порядка их отправки.

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

Желательно не распараллеливать операции обмена сообщениями с помощью алгоритмов планирования и секционирования работы, используемых по умолчанию в библиотеках Task Parallel Library (TPL) и Parallel LINQ (PLINQ). Основы платформы TPL лучше всего подходят для добавления распараллеливания и одновременной обработки в приложения, главным образом, с точки зрения вычислений. Использование TPL "как есть" для повышения производительности кода операций ввода-вывода, например сетевых вызовов и операций обмена сообщениями, может не оправдать ожидания. Наилучший способ использовать TPL для обеспечения асинхронных операций — применить расширенные шаблоны TPL, которые соответствуют асинхронной модели программирования.

Показ:
© 2014 Microsoft