(0) exportieren Drucken
Alle erweitern

Implementieren zuverlässiger Nachrichtenempfangsschleifen

Letzte Aktualisierung: März 2014

Anhand von Beobachtungen aus mehreren Kundenprojekten, die die Brokermessaging-API verwenden, ist uns aufgefallen, dass das Empfangen einer Nachricht häufig mit einer kanonisch wiederholten Implementierung der Empfangslogik verbunden ist, ohne dass ein fundierter Ansatz für die Behandlung potenzieller Anomalien vorliegt. Im Allgemeine sieht eine solche Logik keine Grenzfälle vor, z. B. abgelaufene Nachrichtensperren. Dieser Ansatztyp kann fehleranfällig sein, wenn er nicht auf robuste Weise implementiert wird. In diesem Abschnitt sollen einige spezifische Empfehlungen zur Implementierung zuverlässiger Nachrichtenempfangslogik ausgesprochen werden.

Es ist wichtig, sich der beiden verschiedenen Modi bewusst zu sein, in denen Nachrichten von Servicebus empfangen werden können. Diese Modi werden von der Brokermessaging-API bereitgestellt, um die Nachrichtenübermittlung mithilfe der Semantik "Höchstens ein Mal" (mit ReceiveAndDelete) oder "Mindestens ein Mal" (mit PeekLock) zu unterstützen.

Der erste Modus ist ReceiveAndDelete. Dies ist das einfachste Modell, das am besten in Szenarien funktioniert, in denen die Anwendung einen Fehler bei der Nachrichtenverarbeitung tolerieren kann. Wenn der Modus ReceiveAndDelete verwendet wird, besteht die Empfangsaktion aus einem Einzelhopvorgang, während dem eine an den Client übermittelte Nachricht als verbraucht markiert und anschließend aus der betreffenden Warteschlange oder dem Abonnement entfernt wird.

Der zweite Modus ist PeekLock. Dieser gibt vor, dass eine empfangene Nachricht vor anderen Consumern verborgen wird, bis ihr Sperrentimeout abläuft. Im Modus PeekLock wird der Empfangsvorgang zu einem zweistufigen Vorgang. Auf diese Weise können Anwendungen unterstützt werden, die fehlerhafte Nachrichten nicht tolerieren können. Die verbrauchende Anwendung gibt nicht nur eine Anforderung für den Empfang einer neuen Nachricht aus (erste Stufe), sondern sie muss auch anzeigen, wenn sie die Verarbeitung einer Nachricht abgeschlossen hat (zweite Stufe). Nachdem die Anwendung die Verarbeitung der Nachricht abgeschlossen oder die Nachricht zuverlässig für die zukünftige Verarbeitung gespeichert (zurückgestellt) hat, wird die zweite Stufe des Empfangsvorgangs durch Aufrufen der Methode Complete für die empfangene Nachricht abgeschlossen.

Wenn Sie den Modus PeekLock angeben, sollten Sie die erfolgreiche Verarbeitung einer Nachricht immer finalisieren, indem Sie die Methode Complete aufrufen, die Servicebus anweist, die Nachrichtenverarbeitung als abgeschlossen zu markieren. Wenn die Methode Complete für eine im Modus PeekLock empfangene Nachricht nicht aufgerufen wird, führt dies dazu, dass die Nachricht in einer Warteschlange oder einem Abonnement erneut auftaucht, nachdem ihr Sperrentimeout abgelaufen ist. Daher erhalten Sie die zuvor verarbeitete Nachricht erneut, und dies kann dazu führen, dass ein Nachrichtenduplikat verarbeitet wird.

Außerdem sollten Sie im Modus PeekLockServicebus informieren, wenn eine Nachricht nicht erfolgreich verarbeitet werden kann und daher für eine nachfolgende erneute Übermittlung zurückgesendet werden muss. Nach Möglichkeit sollte Ihre Messaginglösung in dieser Situation die Methode Abandon aufrufen, anstatt zu warten, bis eine für die Nachricht abgerufene Sperre abläuft. Idealerweise rufen Sie die Methode Abandon aus einem Catch-Block auf, der zu dem Try/Catch-Ausnahmeverarbeitungskonstrukt gehört, das für den Messagingverarbeitungskontext verwendet wird.

Sie müssen unbedingt sicherstellen, dass die Nachrichtenverarbeitung ausschließlich innerhalb des definierten Sperrenzeitraums stattfindet. Im Brokermessagingmuster beträgt die maximale Nachrichtensperrendauer 5 Minuten, und diese Dauer kann zurzeit zur Laufzeit nicht erweitert werden. Wenn die Verarbeitung einer Nachricht länger als die für eine Warteschlange oder ein Abonnement festgelegte Sperrendauer dauert, erfolgt ein Timeout der Sichrbarkeitssperre, und die Nachricht wird erneut für die Consumer der Warteschlange oder des Abonnements verfügbar. Wenn Sie versuchen, eine solche Nachricht abzuschließen oder zu verwerfen, erhalten Sie ggf. einen Fehler MessageLockLostException, der anzeigt, dass keine gültige Sperre für die angegebene Nachricht gefunden wurde.

Für die Implementierung einer robusten Nachrichtenempfangsschleife wird empfohlen, Vorkehrungen für alle bekannten vorübergehenden Fehler und alle Anomalien zu treffen, die während oder nach der Nachrichtenverarbeitung auftreten können. Dies ist insbesondere dann wichtig, wenn Sie Nachrichten im Modus PeekLock empfangen. Da im Modus PeekLock immer eine zweite Stufe beteiligt ist, sollten Sie niemals davon ausgehen, dass eine auf dem Client erfolgreich verarbeitete Nachricht im Servicebus-Back-End zuverlässig als abgeschlossen markiert werden kann. Ein Fehler in der zugrunde liegenden Netzwerkschicht kann z. B. den erfolgreichen Abschluss der Nachrichtenverarbeitung verhindern. Dies bedeutet, dass Sie für Idempotenzgrenzfälle Vorkehrungen treffen müssen, da Sie die gleiche Nachricht mehrmals empfangen können. Dieses Verhalten entspricht vielen anderen Messaginglösungen, die im Nachrichtenübermittlungsmodus "Mindestens ein Mal" betrieben werden.

Sie können eine weitere Sicherung hinzufügen, wenn Sie die Methoden Complete und Abandon mithilfe von Erweiterungsmethoden aufrufen. Beispiel:

public static bool SafeComplete(this BrokeredMessage msg)
{
    try
    {
        // Mark brokered message as complete.
        msg.Complete();

        // Return a result indicating that the message has been completed successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, 
        // or the underlying topic/subscription may have been removed.
        // If Complete() fails with this exception, the only recourse is to prepare to receive another message (possibly the same one).
    }

    return false;
}

public static bool SafeAbandon(this BrokeredMessage msg)
{
    try
    {
        // Abandons a brokered message. This will cause the Service Bus to
       // unlock the message and make it available to be received again, 
        // either by the same consumer or by another competing consumer.
        msg.Abandon();

        // Return a result indicating that the message has been abandoned successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock.
        // We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost,
       //  or the underlying topic/subscription may have been removed.
        // If Abandon() fails with this exception, the only recourse is to receive another message (possibly the same one).
    }

    return false;
}

Ein ähnlicher Ansatz kann so erweitert werden, dass andere Messagingvorgänge (z. B. das Zurückstellen) vor potenziellen Fehlern geschützt werden. Das Muster, in dem die oben genannten Erweiterungsmethoden verwendet werden können, wird im Codeausschnitt unten gezeigt. Dieses Codefragment zeigt die Implementierung einer Empfangsschleife und nutzt die zusätzlichen Maßnahmen, die von den Erweiterungsmethoden bereitgestellt werden:

var waitTimeout = TimeSpan.FromSeconds(10);

// Declare an action acting as a callback whenever a message arrives on a queue.
AsyncCallback completeReceive = null;

// Declare an action acting as a callback whenever a non-transient
// exception occurs while receiving or processing messages.
Action<Exception> recoverReceive = null;

// Declare a cancellation token that is used to signal an exit from the receive loop.
var cts = new CancellationTokenSource();

// Declare an action implementing the main processing logic for received messages.
Action<BrokeredMessage> processMessage = ((msg) =>
{
    // Put your custom processing logic here. DO NOT swallow any exceptions.
});

// Declare an action responsible for the core operations in the message receive loop.
Action receiveMessage = (() =>
{
    // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // Start receiving a new message asynchronously.
            queueClient.BeginReceive(waitTimeout, cb, null);
        },
        (ar) =>
        {
            // Make sure we are not told to stop receiving while we were waiting for a new message.
            if (!cts.IsCancellationRequested)
            {
                // Complete the asynchronous operation. 
                // This may throw an exception that will be handled internally by retry policy.
                BrokeredMessage msg = queueClient.EndReceive(ar);

                // Check if we actually received any messages.
                if (msg != null)
                {
                    // Make sure we are not told to stop receiving while we were waiting for a new message.
                    if (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            // Process the received message.
                            processMessage(msg);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.SafeComplete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.SafeAbandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }
                    }
                    else
                    {
                        // If we were told to stop processing, 
                        // the current message needs to be unlocked and return back to the queue.
                        if (queueClient.Mode == ReceiveMode.PeekLock)
                        {
                            msg.SafeAbandon();
                        }
                    }
                }
            }

            // Invoke a custom callback method to indicate that we 
            // have completed an iteration in the message receive loop.
            completeReceive(ar);
        },
        (ex) =>
        {
            // Invoke a custom action to indicate that we have encountered an exception and
            // need further decision as to whether to continue receiving messages.
            recoverReceive(ex);
        });
});

// Initialize a custom action acting as a callback whenever a message arrives on a queue.
completeReceive = ((ar) =>
{
    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop.
        receiveMessage();
    }
});

// Initialize a custom action acting as a callback whenever a
// non-transient exception occurs while receiving or processing messages.
recoverReceive = ((ex) =>
{
    // Just log an exception. Do not allow an unhandled exception to
   // terminate the message receive loop abnormally.
    Trace.TraceError(ex.Message);

    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until
       // we are told to stop regardless of any exceptions.
        receiveMessage();
    }
});

// Start receiving messages asynchronously.
receiveMessage();

// Perform any other work. Message will keep arriving asynchronously 
// while we are busy doing something else.

// Stop the message receive loop gracefully.
cts.Cancel();

Das Beispiel oben implementiert einen erweiterten Ansatz für den asynchronen Empfang von Nachrichten in der Reihenfolge, in der sie in einer Warteschlange erscheinen. Es wird sichergestellt, dass alle Fehler, die während der Verarbeitung auftreten, dazu führen, dass die Nachricht abgebrochen und zurück in die Warteschlange gesendet wird, damit sie erneut verarbeitet werden kann. Der zusätzliche Code ist sinnvoll, weil er den ordnungsgemäßen Abbruch der Nachrichtenempfangsschleife unterstützt.

Anzeigen:
© 2014 Microsoft