VENDAS: 1-800-867-1389

Receber mensagens de maneira assíncrona

Atualizado: março de 2014

Similar ao envio de mensagens de maneira assíncrona, e também sob um ponto de vista prático, você também pode estender a utilização do modelo de programação assíncrona para receber mensagens de Microsoft Azure Service Bus.

Enquanto aguarda por novas mensagens de uma assinatura ou fila Service Bus, sua solução geralmente emite um pedido de sondagem. Felizmente, Service Bus oferece uma operação de recepção de sondagem longa que mantém a conexão ao servidor até que uma mensagem chegue em uma fila ou decorra o período de tempo limite, o que acontecer primeiro. Se uma recepção de sondagem longa for executada de maneira síncrona, irá bloquear o pool de threads do CLR ao aguardar por uma nova mensagem, o que não é considerado ideal. A capacidade do pool de threads do CLR geralmente é limitada; assim há um boa razão para evitar a utilização do pool de threads para operações particularmente longas.

Para construir uma solução de mensagens verdadeiramente efetiva utilizando a Service Bus API de mensagens orientadas, você deve sempre executar a operação de recepção de maneira assíncrona. Mesmo que sua solução receba uma mensagem por vez ou busque mensagens múltiplas, você inicia a operação de recebimento utilizando o método BeginReceive com o tempo limite especificado. Na API atual, o valor máximo para limite de tempo de recebimento é de 24 dias. Enquanto o cliente de mensagens Service Bus aguarda por uma nova mensagem em seu nome, sua solução pode continuar executando qualquer outro trabalho. Ao finalizar, seu método de retorno de chamada será notificado e a mensagem recebida (se houver) estará disponível para processamento.

noteObservação
Após uma mensagem ser recebida de uma fila ou assinatura, seu corpo pode ser lido apenas uma vez. Devido à natureza dos protocolos de rede, fluxos de dados de mensagens não são sempre "rebobináveis", porque geralmente não oferecem suporte para uma operação de busca. Você deve proteger os dados da mensagem ao inseri-la em um objeto após a chamada do método GetBody() , e então mantendo o objeto enquanto precisar. Tentativas de invocar o método GetBody() mais de uma vez não tem suporte na API de mensagens orientadas.

O exemplo de código abaixo exibe um exemplo de um método de programação que recebe de maneira assíncrona o número especificado de mensagens de uma fila Service Bus :

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

Seguindo a recomendação da seção anterior é melhor utilizar o integração do modelo de programação assíncrona fornecida pela Biblioteca paralela de tarefas para paralelizar a operação assíncrona de recebimento de mensagens.

Isso foi útil para você?
(1500 caracteres restantes)
Agradecemos os seus comentários
Mostrar:
© 2014 Microsoft