(0) exportieren Drucken
Alle erweitern

Asynchrones Empfangen von Nachrichten

Letzte Aktualisierung: März 2014

Ähnlich wie beim asynchronen Senden von Nachrichten (und auch unter praktischen Gesichtspunkten) können Sie auch die Verwendung des asynchronen Programmiermodells für das Empfangen von Nachrichten von Microsoft Azure Service Bus erweitern.

Während des Wartens auf neue Nachrichten für eine Servicebus-Warteschlange oder ein -Abonnement gibt Ihre Lösung häufig eine Abrufanforderung aus. Glücklicherweise stellt Servicebus einen Empfangsvorgang mit langer Abrufdauer zur Verfügung, der eine Verbindung mit dem Server aufrecht erhält, bis eine Nachricht für eine Warteschlange eingeht bzw. der angegebene Timeoutzeitraum verstrichen ist – je nachdem, welches Ereignis zuerst eintritt. Wenn ein Empfangsvorgang mit langer Abrufdauer synchron ausgeführt wird, blockiert er den CLR-Threadpoolthread während des Wartens auf eine neue Nachricht. Dies wird nicht als optimal angesehen. Die Kapazität des CLR-Threadpools ist im Allgemeinen eingeschränkt. Es gibt also gute Gründe dafür, die Verwendung des Threadpools für Vorgänge mit besonders langer Ausführungszeit zu vermeiden.

Wenn Sie eine wirklich effektive Messaginglösung mithilfe der Brokermessaging-API von Servicebus erstellen möchten, sollten Sie den Empfangsvorgang immer asynchron ausführen. Unabhängig davon, ob Ihre Lösung Nachrichten nacheinander empfängt oder mehrere Nachrichten abruft, beginnen Sie den Empfangsvorgang mithilfe der Methode BeginReceive mit dem angegebenen Timeout. In der aktuellen API beträgt der maximale Empfangstimeoutwert 24 Tage. Während der Servicebus-Messagingclient in Ihrem Auftrag auf eine neue Nachricht wartet, kann Ihre Lösung beliebige andere Arbeiten ausführen. Nach dem Abschluss wird Ihre Rückrufmethode benachrichtigt, und die empfangene Nachricht (wenn vorhanden) steht für die Verarbeitung zur Verfügung.

noteHinweis
Nachdem eine Nachricht von einer Warteschlange oder einem Abonnement empfangen wurde, kann ihr Text nur ein Mal gelesen werden. Aufgrund der Natur von Netzwerkprotokollen sind Nachrichtendatenströme nicht immer "zurückspulbar", weil sie häufig keinen Suchvorgang unterstützen. Sie sollten die Nachrichtendaten sichern, indem Sie sie nach dem Aufruf der Methode GetBody() in einem Objekt speichern und dieses Objekt dann für den benötigten Zeitraum beibehalten. Der mehrmalige Versuch, die Methode GetBody() aufzurufen, wird von der Brokermessaging-API nicht unterstützt.

Das Codebeispiel unten zeigt eine Programmiermethode, die die angegebene Anzahl von Nachrichten asynchron von einer Servicebus-Warteschlange empfängt:

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

Abgesehen von der Empfehlung im vorherigen Abschnitt besteht das beste Verfahren im Verwenden der Integration des asynchronen Programmiermodells von der Task Parallel Library, um den Vorgang des asynchronen Empfangs von Nachrichten zu parallelisieren.

Anzeigen:
© 2014 Microsoft