VENTAS: 1-800-867-1389

Recibir mensajes de forma asincrónica

Actualizado: marzo de 2014

Desde un punto de vista práctico, y de forma similar al envío de mensajes de modo asincrónico, también puede extender el uso del modelo de programación asincrónica para recibir mensajes de Service Bus de Windows Azure.

Recibir mensajes de forma asincrónica

Mientras espera nuevos mensajes en una cola o suscripción de Service Bus, su solución emite a menudo una solicitud de sondeo. Afortunadamente, Service Bus ofrece una operación de recepción de sondeo largo que mantiene una conexión con el servidor hasta que llega un mensaje a una cola o, si tiene lugar antes, hasta que transcurre el período de tiempo de espera especificado. Si se realiza una recepción de sondeo largo de forma sincrónica, se bloqueará el grupo de subprocesos de CLR mientras espera un nuevo mensaje, lo cual no se considera óptimo. Normalmente, la capacidad del grupo de subprocesos de CLR es limitada; por lo tanto, hay una buena razón para evitar el uso del grupo de subprocesos para operaciones de ejecución especialmente larga.

Para compilar una solución de mensajería verdaderamente efectiva usando la API de mensajería desacoplada de Service Bus, debe realizar siempre la operación de recepción de forma asincrónica. Tanto si la solución recibe mensajes de uno en uno o captura varios mensajes a la vez, debe empezar la operación de recepción usando el método BeginReceive con el tiempo de espera especificado. En la API actual, el valor del tiempo de espera de recepción máximo es 24 días. Mientras el cliente de mensajería de Service Bus espera un nuevo mensaje en su nombre, su solución puede proceder a realizar cualquier otro trabajo. Una vez finalizado el proceso, se notificará al método de devolución de llamada y el mensaje que se haya recibido (si hubiera alguno) estará disponible para ser procesado.

noteNota
Una vez que se haya recibido un mensaje de una cola o de una suscripción, su cuerpo solo se puede leer una vez. Dada la naturaleza de los protocolos de red, las secuencias de datos del mensaje no siempre son "rebobinables", porque a menudo no admiten una operación de búsqueda. Debe proteger los datos del mensaje colocándolos en un objeto después de llamar al método GetBody() y, a continuación, mantener el objeto tanto tiempo como sea necesario. La API de mensajería desacoplada no admite el intento de invocar el método GetBody() más de una vez.

El código de ejemplo que aparece a continuación muestra un ejemplo de método de programación que recibe de forma asincrónica el número especificado de mensajes de una cola de Service Bus:

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

En consonancia con la recomendación de la sección anterior, es mejor usar la integración del modelo de programación asincrónica proporcionado por la Biblioteca en paralelo de tareas para poner en paralelo las operaciones asincrónicas de recepción de mensajes.

¿Te ha resultado útil?
(Caracteres restantes: 1500)
Gracias por sus comentarios

Adiciones de comunidad

AGREGAR
Mostrar:
© 2014 Microsoft