Exportar (0) Imprimir
Expandir todo

Implementar bucles de recepción de mensajes fiables

Actualizado: marzo de 2014

A través de observaciones de varios proyectos de clientes que aprovechan la API de mensajería desacoplada, observamos que, a menudo, recibir un mensaje está sujeto a una implementación repetida convencional de la lógica de recepción sin un enfoque sólido para tratar las posibles anomalías. Generalmente, dicha lógica no permite casos extremos como, por ejemplo, bloqueos de mensajes expirados. Este tipo de enfoque puede ser propenso a errores si no se implementa de manera sólida. El propósito de esta sección es proporcionar recomendaciones específicas sobre la implementación de la lógica de recepción de mensajes fiable.

Bucles de recepción de mensajes fiables

En primer lugar, es importante tener en cuenta los dos modos distintos en que se pueden recibir mensajes de Service Bus. Estos modos son proporcionados por la API de mensajería acoplada para admitir el envío de mensajes usando la semántica “Una vez como máximo” (con ReceiveAndDelete) o “Una vez como mínimo” (con PeekLock).

El primer modo es ReceiveAndDelete, que es el modelo más sencillo y funciona mejor en escenarios en que la aplicación puede tolerar un error en el procesamiento de mensajes. Al usar el modo ReceiveAndDelete, la acción de recepción es una operación única durante la cual un mensaje enviado al cliente se marca como consumido y, posteriormente, se elimina de la cola o suscripción respectiva.

El segundo modo es PeekLock, que prescribe que un mensaje recibido debe permanecer oculto a otros consumidores hasta que expire su tiempo de espera de bloqueo. Con el modo PeekLock, el proceso de recepción se convierte en una operación de dos fases que hace posible admitir aplicaciones que no pueden tolerar mensajes con errores. Además de emitir una solicitud para recibir un mensaje nuevo (primera fase), la aplicación que consume debe indicar cuándo finaliza el procesamiento del mensaje (segunda fase). Una vez que la aplicación acaba de procesar el mensaje, o lo almacena (aplaza) de manera fiable para el procesamiento futuro, completa la segunda fase del proceso de recepción llamando al método Complete en el mensaje recibido.

Cuando especifique el modo PeekLock, debe finalizar siempre el procesamiento correcto de un mensaje llamando al método Complete, que indica a Service Bus que marque el procesamiento del mensaje como finalizado. Si no se llama al método Complete en un mensaje recibido en modo PeekLock, el mensaje volverá a aparecer en una cola o suscripción después de que expire su tiempo de espera de bloqueo. En consecuencia, volverá a recibir el mensaje procesado anteriormente, lo que puede provocar que se procese un mensaje duplicado.

Además, en relación con el modo PeekLock, debe indicar a Service Bus si un mensaje no se puede procesar correctamente y, por consiguiente, debe devolverse para el reenvío posterior. Siempre que sea posible, su solución de mensajería debería gestionar esta situación llamando al método Abandon, en lugar de esperar a que expire un bloqueo adquirido para el mensaje. Idealmente, llamará al método Abandon desde un bloque catch que pertenece a la construcción de control de excepciones try/catch que sirve al contexto de control de mensajería.

Es importante asegurarse de que el procesamiento de mensajes ocurre estrictamente dentro del período de bloqueo designado. En el patrón de mensajería acoplada, la duración de bloqueo de mensajes máxima es de 5 minutos y no se puede ampliar en tiempo de ejecución. Si un mensaje tarda más en procesarse que la duración de bloqueo establecida en una cola o suscripción, el tiempo de espera de su bloqueo de visibilidad se agotará y el mensaje volverá a estar disponible para los consumidores de la cola o suscripción. Si intenta finalizar o abandonar dicho mensaje, es posible que reciba un error MessageLockLostException que indica que no se ha encontrado ningún bloqueo válido para el mensaje determinado.

A fin de implementar un bucle de recepción de mensajes sólido, se recomienda crear resistencia contra todos los errores transitorios conocidos, así como cualquier anomalía que se presente durante o después del procesamiento de un mensaje. Esto es especialmente importante al recibir mensajes usando el modo PeekLock. Puesto que siempre hay una segunda fase implicada en el modo PeekLock, nunca debe asumir que un mensaje procesado correctamente en el cliente se puede marcar de forma fiable como completado en el back-end de Service Bus. Por ejemplo, un error en el nivel de red subyacente puede impedirle completar el procesamiento del mensaje correctamente. Esta implicación requiere que gestione casos extremos de idempotencia, ya que puede recibir el mismo mensaje más de una vez. Este comportamiento está en consonancia con muchas otras soluciones de mensajería que funcionan en el modo de envío de mensajes "Una vez como mínimo".

Puede agregar resistencia adicional al llamar a los métodos Complete y Abandon usando métodos de extensión. Por ejemplo:

public static bool SafeComplete(this BrokeredMessage msg)
{
    try
    {
        // Mark brokered message as complete.
        msg.Complete();

        // Return a result indicating that the message has been completed successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, 
        // or the underlying topic/subscription may have been removed.
        // If Complete() fails with this exception, the only recourse is to prepare to receive another message (possibly the same one).
    }

    return false;
}

public static bool SafeAbandon(this BrokeredMessage msg)
{
    try
    {
        // Abandons a brokered message. This will cause the Service Bus to
       // unlock the message and make it available to be received again, 
        // either by the same consumer or by another competing consumer.
        msg.Abandon();

        // Return a result indicating that the message has been abandoned successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock.
        // We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost,
       //  or the underlying topic/subscription may have been removed.
        // If Abandon() fails with this exception, the only recourse is to receive another message (possibly the same one).
    }

    return false;
}

Se puede extender un enfoque similar para proteger otras operaciones de mensajería como Aplazar contra posibles errores. El patrón en el que se pueden usar los métodos de extensión anteriores se refleja en el fragmento de código que se muestra a continuación. Este fragmento de código demuestra cómo implementar un bucle de recepción al tiempo que se aprovecha la resistencia adicional que proporcionan los métodos de extensión:

var waitTimeout = TimeSpan.FromSeconds(10);

// Declare an action acting as a callback whenever a message arrives on a queue.
AsyncCallback completeReceive = null;

// Declare an action acting as a callback whenever a non-transient
// exception occurs while receiving or processing messages.
Action<Exception> recoverReceive = null;

// Declare a cancellation token that is used to signal an exit from the receive loop.
var cts = new CancellationTokenSource();

// Declare an action implementing the main processing logic for received messages.
Action<BrokeredMessage> processMessage = ((msg) =>
{
    // Put your custom processing logic here. DO NOT swallow any exceptions.
});

// Declare an action responsible for the core operations in the message receive loop.
Action receiveMessage = (() =>
{
    // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // Start receiving a new message asynchronously.
            queueClient.BeginReceive(waitTimeout, cb, null);
        },
        (ar) =>
        {
            // Make sure we are not told to stop receiving while we were waiting for a new message.
            if (!cts.IsCancellationRequested)
            {
                // Complete the asynchronous operation. 
                // This may throw an exception that will be handled internally by retry policy.
                BrokeredMessage msg = queueClient.EndReceive(ar);

                // Check if we actually received any messages.
                if (msg != null)
                {
                    // Make sure we are not told to stop receiving while we were waiting for a new message.
                    if (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            // Process the received message.
                            processMessage(msg);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.SafeComplete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.SafeAbandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }
                    }
                    else
                    {
                        // If we were told to stop processing, 
                        // the current message needs to be unlocked and return back to the queue.
                        if (queueClient.Mode == ReceiveMode.PeekLock)
                        {
                            msg.SafeAbandon();
                        }
                    }
                }
            }

            // Invoke a custom callback method to indicate that we 
            // have completed an iteration in the message receive loop.
            completeReceive(ar);
        },
        (ex) =>
        {
            // Invoke a custom action to indicate that we have encountered an exception and
            // need further decision as to whether to continue receiving messages.
            recoverReceive(ex);
        });
});

// Initialize a custom action acting as a callback whenever a message arrives on a queue.
completeReceive = ((ar) =>
{
    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop.
        receiveMessage();
    }
});

// Initialize a custom action acting as a callback whenever a
// non-transient exception occurs while receiving or processing messages.
recoverReceive = ((ex) =>
{
    // Just log an exception. Do not allow an unhandled exception to
   // terminate the message receive loop abnormally.
    Trace.TraceError(ex.Message);

    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until
       // we are told to stop regardless of any exceptions.
        receiveMessage();
    }
});

// Start receiving messages asynchronously.
receiveMessage();

// Perform any other work. Message will keep arriving asynchronously 
// while we are busy doing something else.

// Stop the message receive loop gracefully.
cts.Cancel();

El ejemplo anterior implementa un enfoque avanzado para recibir mensajes de forma asincrónica en el orden en que aparecen en una cola. Garantiza que cualquier error detectado durante el procesamiento provocará la cancelación del mensaje y lo devolverá a la cola para que pueda volver a ser procesado. El código adicional se justifica admitiendo la cancelación correcta del bucle de recepción del mensaje.

Adiciones de comunidad

AGREGAR
Mostrar:
© 2014 Microsoft