VENTES: 1-800-867-1389

Implémentation des boucles de réception de messages fiables

Mis à jour: mars 2014

À travers les observations de plusieurs projets clients tirant parti de l'API de messagerie répartie, nous avons remarqué que la réception d'un message fait souvent l'objet d'une implémentation répétée canonique de la logique de réception sans une approche saine de la manipulation d'éventuelles anomalies. Généralement, une telle logique n'autorise pas les cas extrêmes, par exemple, les verrouillages des messages expirés. Ce type d'approche peut être source d'erreurs s'il n'est pas appliqué de façon fiable. Le but de cette section est de fournir des recommandations spécifiques concernant la mise en œuvre d'une logique de réception de messages fiable.

Tout d'abord, il est important de noter les deux modes distincts dans lesquels des messages peuvent être reçus de Service Bus. Ces modes sont fournis par l'API de messagerie répartie pour prendre en charge la livraison de messages à l'aide de la sémantique « Au plus un fois » (avec ReceiveAndDelete) ou « Au moins une fois » (avec PeekLock).

Le premier mode est ReceiveAndDelete, qui est le modèle le plus simple et qui fonctionne le mieux pour les scénarios dans lesquels l'application peut tolérer une défaillance dans le traitement des messages.. Lorsque vous utilisez le mode ReceiveAndDelete, l'action de réception est une opération à un tronçon au cours de laquelle un message livré au client est marqué comme étant consommé et par la suite retiré de la file d'attente ou de l'abonnement.

Le second mode est PeekLock, qui prescrit qu'un message reçu doit rester caché aux autres consommateurs jusqu'à l'expiration de son délai de verrouillage. Avec le mode PeekLock, le processus de réception devient une opération en deux temps permettant de prendre en charge des applications qui ne tolèrent pas les messages générant une erreur. En plus d'émettre une demande pour recevoir un nouveau message (première étape), l'application consommatrice doit indiquer quand elle a fini de traiter le message (deuxième étape). Une fois que l'application a terminé le traitement du message, ou l'a stocké (différé) de façon fiable pour un traitement ultérieur, elle exécute la deuxième étape du processus de réception en appelant la méthode Complete sur le message reçu.

Quand vous spécifiez le mode PeekLock, vous devez toujours finaliser le traitement réussi d'un message en appelant la méthode Complete, qui indique à Service Bus de marquer le traitement du message comme étant terminé. Faute d'appeler la méthode Complete sur un message reçu en mode PeekLock, le message réapparaît dans une file d'attente ou un abonnement à l'expiration de son délai de verrouillage. Par conséquent, vous recevez de nouveau le message traité précédemment, et cela peut entraîner le traitement d'un message en double.

De plus, par rapport au mode PeekLock, vous devez indiquer à Service Bus si un message ne peut pas être traité correctement et doit alors être retourné pour une livraison ultérieure. Chaque fois que possible, votre solution de messagerie doit gérer cette situation en appelant la méthode Abandon, au lieu d'attendre qu'un verrou acquis pour le message expire. Dans l'idéal, vous appellerez la méthode Abandon à partir d'un bloc catch appartenant à la construction de gestion des exceptions try/catch traitant la contexte de gestion de messagerie.

Il est important de faire en sorte que le traitement du message se produise uniquement à l'intérieur de la période de verrouillage désignée. Dans le modèle de messagerie répartie, la durée maximale de verrouillage du message est de 5 minutes, et cette durée ne peut actuellement pas être étendue au moment de l'exécution. S'il faut pour traiter un message plus de temps que la durée de verrouillage définie sur une file d'attente ou un abonnement, son verrou de visibilité expirera et le message sera à nouveau disponible pour les consommateurs de la file d'attente ou de l'abonnement. Si vous essayez de terminer ou d'abandonner un tel message, vous pouvez obtenir une erreur MessageLockLostException qui indique qu'aucun verrou valide n'a été trouvé pour le message donné.

Afin d'implémenter une boucle de réception de message robuste, il est recommandé de créer une résilience contre toutes les erreurs transitoires connues ainsi que les anomalies pouvant se manifester pendant ou après le traitement des messages. Ceci est particulièrement important lors de la réception des messages avec le mode PeekLock. Comme il y a toujours une deuxième étape impliquée dans le mode PeekLock, vous ne devriez jamais supposer qu'un message traité avec succès sur le client puisse être marqué de façon fiable comme étant terminé dans le serveur principal Service Bus. Par exemple, un incident dans la couche réseau sous-jacente peut vous empêcher de terminer correctement le traitement des messages. Une telle implication nécessite la gestion de cas extrêmes d'idempotence, dans la mesure où vous pouvez recevoir le même message plus d'une fois. Ce comportement est conforme à de nombreuses autres solutions de messagerie qui opèrent selon le mode de livraison de messages « Au moins une fois ».

Vous pouvez ajouter une résilience supplémentaire quand vous appelez les méthodes Complete et Abandon en utilisant les méthodes d'extension. Par exemple :

public static bool SafeComplete(this BrokeredMessage msg)
{
    try
    {
        // Mark brokered message as complete.
        msg.Complete();

        // Return a result indicating that the message has been completed successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, 
        // or the underlying topic/subscription may have been removed.
        // If Complete() fails with this exception, the only recourse is to prepare to receive another message (possibly the same one).
    }

    return false;
}

public static bool SafeAbandon(this BrokeredMessage msg)
{
    try
    {
        // Abandons a brokered message. This will cause the Service Bus to
       // unlock the message and make it available to be received again, 
        // either by the same consumer or by another competing consumer.
        msg.Abandon();

        // Return a result indicating that the message has been abandoned successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock.
        // We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost,
       //  or the underlying topic/subscription may have been removed.
        // If Abandon() fails with this exception, the only recourse is to receive another message (possibly the same one).
    }

    return false;
}

Une approche similaire peut être étendue pour protéger d'autres opérations de messagerie telles que Defer d'un échec potentiel. Le schéma dans lequel les méthodes d'extension ci-dessus peuvent être utilisées est reflété dans l'extrait de code ci-dessous. Ce fragment de code montre comment implémenter une boucle de réception tout en profitant de la résilience supplémentaire fournie par les méthodes d'extension :

var waitTimeout = TimeSpan.FromSeconds(10);

// Declare an action acting as a callback whenever a message arrives on a queue.
AsyncCallback completeReceive = null;

// Declare an action acting as a callback whenever a non-transient
// exception occurs while receiving or processing messages.
Action<Exception> recoverReceive = null;

// Declare a cancellation token that is used to signal an exit from the receive loop.
var cts = new CancellationTokenSource();

// Declare an action implementing the main processing logic for received messages.
Action<BrokeredMessage> processMessage = ((msg) =>
{
    // Put your custom processing logic here. DO NOT swallow any exceptions.
});

// Declare an action responsible for the core operations in the message receive loop.
Action receiveMessage = (() =>
{
    // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // Start receiving a new message asynchronously.
            queueClient.BeginReceive(waitTimeout, cb, null);
        },
        (ar) =>
        {
            // Make sure we are not told to stop receiving while we were waiting for a new message.
            if (!cts.IsCancellationRequested)
            {
                // Complete the asynchronous operation. 
                // This may throw an exception that will be handled internally by retry policy.
                BrokeredMessage msg = queueClient.EndReceive(ar);

                // Check if we actually received any messages.
                if (msg != null)
                {
                    // Make sure we are not told to stop receiving while we were waiting for a new message.
                    if (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            // Process the received message.
                            processMessage(msg);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.SafeComplete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.SafeAbandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }
                    }
                    else
                    {
                        // If we were told to stop processing, 
                        // the current message needs to be unlocked and return back to the queue.
                        if (queueClient.Mode == ReceiveMode.PeekLock)
                        {
                            msg.SafeAbandon();
                        }
                    }
                }
            }

            // Invoke a custom callback method to indicate that we 
            // have completed an iteration in the message receive loop.
            completeReceive(ar);
        },
        (ex) =>
        {
            // Invoke a custom action to indicate that we have encountered an exception and
            // need further decision as to whether to continue receiving messages.
            recoverReceive(ex);
        });
});

// Initialize a custom action acting as a callback whenever a message arrives on a queue.
completeReceive = ((ar) =>
{
    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop.
        receiveMessage();
    }
});

// Initialize a custom action acting as a callback whenever a
// non-transient exception occurs while receiving or processing messages.
recoverReceive = ((ex) =>
{
    // Just log an exception. Do not allow an unhandled exception to
   // terminate the message receive loop abnormally.
    Trace.TraceError(ex.Message);

    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until
       // we are told to stop regardless of any exceptions.
        receiveMessage();
    }
});

// Start receiving messages asynchronously.
receiveMessage();

// Perform any other work. Message will keep arriving asynchronously 
// while we are busy doing something else.

// Stop the message receive loop gracefully.
cts.Cancel();

L'exemple précédent implémente une approche avancée pour la réception des messages de manière asynchrone dans l'ordre dans lequel ils apparaissent dans une file d'attente. Elle garantit que les erreurs rencontrées pendant le traitement se traduiront par l'annulation du message et son renvoi dans la file d'attente de manière à ce qu'il puisse être retraité. Le code supplémentaire est justifié par la prise en charge de l'annulation correcte de la boucle de réception des messages.

Cela vous a-t-il été utile ?
(1500 caractères restants)
Merci pour vos suggestions.
Afficher:
© 2014 Microsoft