VENDITE: 1-800-867-1389

Implementazione dei cicli di ricezione dei messaggi affidabili

Aggiornamento: marzo 2014

L'osservazione di diversi clienti, che hanno adottato l'API di messaggistica negoziata, ha consentito di rilevare come la ricezione di un messaggio sia spesso soggetta a un'implementazione canonica ripetuta della logica di ricezione, senza un approccio valido per la gestione delle potenziali anomalie. In genere, tale logica non consente la gestione di casi limite, come i blocchi dei messaggi scaduti. Questo tipo di approccio può essere soggetto a errori se non viene implementato con funzioni affidabili. Lo scopo di questa sezione è fornire alcuni consigli specifici sull'implementazione di una logica affidabile per la ricezione dei messaggi.

È innanzitutto importante osservare le due diverse modalità in cui è possibile ricevere messaggi da Service Bus. Queste modalità sono fornite dall'API di messaggistica negoziata per il supporto del recapito dei messaggi mediante le semantica "At Most Once" (con ReceiveAndDelete) o "At Least Once" ( PeekLock).

La prima modalità è ReceiveAndDelete, la quale è il modello più semplice e funziona in modo migliore per gli scenari in cui l'applicazione è in grado di tollerare un errore durante l'elaborazione dei messaggi. Quando si utilizza la modalità ReceiveAndDelete, l'azione di ricezione è un'operazione tra due entità (single-hop), durante la quale un messaggio recapitato al client viene contrassegnato come utilizzato e viene successivamente rimosso dalla rispettiva coda o sottoscrizione.

La seconda modalità è PeekLock, la quale prevede che un messaggio ricevuto debba rimanere nascosto agli altri utenti fino a quando il timeout del blocco non scade. Con la modalità PeekLock il processo di ricezione inizia con un'operazione in due fasi che rende possibile il supporto delle applicazioni che non sono in grado di tollerare messaggi non riusciti. Oltre all'emissione di una richiesta di ricezione di un nuovo messaggio (prima fase), l'applicazione che lo utilizza deve richiedere di comunicare quando l'elaborazione del messaggio (seconda fase) è stata completata. Quando l'applicazione ha completato l'elaborazione del messaggio oppure quando lo ha archiviato (rinviato) in modo affidabile per l'elaborazione futura, completa la seconda fase del processo di ricezione mediante la chiamata del metodo Complete sul messaggio ricevuto.

Quando si specifica la modalità PeekLock, è sempre necessario completare la corretta elaborazione di un messaggio mediante la chiamata del metodo Complete, il quale indica a Service Bus di contrassegnare l'elaborazione del messaggio come completata. Se la chiamata del metodo Complete su un messaggio ricevuto in modalità PeekLock non riesce, dopo la scadenza del timeout del relativo blocco, il messaggio verrà nuovamente visualizzato in una coda o in una sottoscrizione. Di conseguenza, si riceverà nuovamente il messaggio elaborato in precedenza e questo può determinare l'elaborazione di un messaggio duplicato.

Inoltre, in relazione alla modalità PeekLock, è necessario comunicare a Service Bus che un messaggio non può essere elaborato correttamente e che deve quindi essere restituito per essere recapitato in un secondo momento. Quando possibile, anziché attendere fino alla scadenza del blocco acquisito per il messaggio, la soluzione di messaggistica in uso deve consentire di gestire questa situazione mediante la chiamata del metodo Abandon. La chiamata del metodo Abandon deve idealmente essere effettuata da un blocco catch, appartenente al costrutto di gestione delle eccezioni try/catch, che serve il contesto di gestione della messaggistica.

È importante assicurarsi che l'elaborazione dei messaggi venga eseguita strettamente all'interno del periodo di blocco designato. Nel modello di messaggistica negoziata la durata massima del blocco del messaggio è di 5 minuti e tale valore non può essere attualmente esteso in fase di esecuzione. Se l'elaborazione di un messaggio richiede più tempo rispetto alla durata del blocco impostata in una coda o in una sottoscrizione, il relativo blocco di visibilità scadrà e il messaggio diventerà nuovamente disponibile per gli utenti della coda o della sottoscrizione. Se si tenta di completare o di uscire da tale messaggio, potrebbe essere restituito un errore MessageLockLostException indicante che per il messaggio in questione non è stato trovato alcun blocco valido.

Per poter implementare un ciclo di ricezione dei messaggi affidabile, si consiglia di creare la resilienza per gli errori temporanei noti e le anomalie che possono verificarsi durante o dopo l'elaborazione di un messaggio. Questo risulta importante in particolare quando si ricevono messaggi utilizzando la modalità PeekLock. Poiché la modalità PeekLock prevede sempre una seconda fase, si consiglia di non dar mai per scontato che un messaggio elaborato correttamente sul client possa essere con certezza contrassegnato come completato nel sistema back-end di Service Bus. Un errore nel livello di rete sottostante può, ad esempio, impedire il completamento corretto della relativa elaborazione. Tale implicazione richiede la gestione dei casi limite di idempotenza, in quanto è possibile ricevere più volte lo stesso messaggio. Questo comportamento è in linea con molte altre soluzioni di messaggistica che operano in modalità di recapito dei messaggi "At Least Once".

È inoltre possibile aggiungere ulteriore resilienza quando si chiamano i metodi Complete e Abandon mediante i metodi di estensione Ad esempio:

public static bool SafeComplete(this BrokeredMessage msg)
{
    try
    {
        // Mark brokered message as complete.
        msg.Complete();

        // Return a result indicating that the message has been completed successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, 
        // or the underlying topic/subscription may have been removed.
        // If Complete() fails with this exception, the only recourse is to prepare to receive another message (possibly the same one).
    }

    return false;
}

public static bool SafeAbandon(this BrokeredMessage msg)
{
    try
    {
        // Abandons a brokered message. This will cause the Service Bus to
       // unlock the message and make it available to be received again, 
        // either by the same consumer or by another competing consumer.
        msg.Abandon();

        // Return a result indicating that the message has been abandoned successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock.
        // We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost,
       //  or the underlying topic/subscription may have been removed.
        // If Abandon() fails with this exception, the only recourse is to receive another message (possibly the same one).
    }

    return false;
}

È possibile estendere un approccio simile alla protezione da potenziali errori di altre operazioni di messaggistica quali Defer. Il modello con cui è possibile utilizzare i metodi di estensione menzionati in precedenza è illustrato nel frammento di codice riportato di seguito. Questo frammento di codice mostra come implementare un ciclo di ricezione sfruttando la resilienza aggiuntiva fornita dai metodi di estensione:

var waitTimeout = TimeSpan.FromSeconds(10);

// Declare an action acting as a callback whenever a message arrives on a queue.
AsyncCallback completeReceive = null;

// Declare an action acting as a callback whenever a non-transient
// exception occurs while receiving or processing messages.
Action<Exception> recoverReceive = null;

// Declare a cancellation token that is used to signal an exit from the receive loop.
var cts = new CancellationTokenSource();

// Declare an action implementing the main processing logic for received messages.
Action<BrokeredMessage> processMessage = ((msg) =>
{
    // Put your custom processing logic here. DO NOT swallow any exceptions.
});

// Declare an action responsible for the core operations in the message receive loop.
Action receiveMessage = (() =>
{
    // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // Start receiving a new message asynchronously.
            queueClient.BeginReceive(waitTimeout, cb, null);
        },
        (ar) =>
        {
            // Make sure we are not told to stop receiving while we were waiting for a new message.
            if (!cts.IsCancellationRequested)
            {
                // Complete the asynchronous operation. 
                // This may throw an exception that will be handled internally by retry policy.
                BrokeredMessage msg = queueClient.EndReceive(ar);

                // Check if we actually received any messages.
                if (msg != null)
                {
                    // Make sure we are not told to stop receiving while we were waiting for a new message.
                    if (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            // Process the received message.
                            processMessage(msg);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.SafeComplete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.SafeAbandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }
                    }
                    else
                    {
                        // If we were told to stop processing, 
                        // the current message needs to be unlocked and return back to the queue.
                        if (queueClient.Mode == ReceiveMode.PeekLock)
                        {
                            msg.SafeAbandon();
                        }
                    }
                }
            }

            // Invoke a custom callback method to indicate that we 
            // have completed an iteration in the message receive loop.
            completeReceive(ar);
        },
        (ex) =>
        {
            // Invoke a custom action to indicate that we have encountered an exception and
            // need further decision as to whether to continue receiving messages.
            recoverReceive(ex);
        });
});

// Initialize a custom action acting as a callback whenever a message arrives on a queue.
completeReceive = ((ar) =>
{
    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop.
        receiveMessage();
    }
});

// Initialize a custom action acting as a callback whenever a
// non-transient exception occurs while receiving or processing messages.
recoverReceive = ((ex) =>
{
    // Just log an exception. Do not allow an unhandled exception to
   // terminate the message receive loop abnormally.
    Trace.TraceError(ex.Message);

    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until
       // we are told to stop regardless of any exceptions.
        receiveMessage();
    }
});

// Start receiving messages asynchronously.
receiveMessage();

// Perform any other work. Message will keep arriving asynchronously 
// while we are busy doing something else.

// Stop the message receive loop gracefully.
cts.Cancel();

Nell'esempio precedente viene implementato un approccio avanzato alla ricezione asincrona dei messaggi, nell'ordine in cui sono visualizzati in una coda. Questo approccio assicura che eventuali errori riscontrati durante l'elaborazione determinino l'annullamento del messaggio e la relativa restituzione alla coda affinché possa essere rielaborato. Il codice aggiuntivo è giustificato dal supporto del normale annullamento del ciclo di ricezione dei messaggi.

Il documento è risultato utile?
(1500 caratteri rimanenti)
Grazie per i commenti inviati.
Mostra:
© 2014 Microsoft