Esporta (0) Stampa
Espandi tutto

Ricezione di messaggi in modo asincrono

Aggiornamento: marzo 2014

Simile all'invio di messaggi in modo asincrono e anche da un punto di vista pratico, è inoltre possibile estendere l'utilizzo del modello di programmazione asincrona per la ricezione di messaggi da Service Bus di Microsoft Azure.

Durante l'attesa di nuovi messaggi in una sottoscrizione o in una coda di Service Bus, la soluzione in uso spesso genera una richiesta di polling. Fortunatamente, Service Bus offre un'operazione di ricezione con polling prolungato che mantiene una connessione al server finché un messaggio non raggiungerà una coda o non sarà trascorso un periodo di timeout specificato, a seconda di quale delle due condizioni si verifica prima. Se viene eseguita una ricezione con polling prolungato in modo sincrono, bloccherà il thread del pool CLR in attesa di un nuovo messaggio, pertanto questo metodo non è ottimale. La capacità del pool di thread CLR è generalmente limitata; pertanto esiste un buon motivo per evitare di utilizzare il pool di thread per operazioni con esecuzione particolarmente prolungata.

Per compilare una soluzione di messaggistica veramente efficiente utilizzando l'API di messaggistica negoziata di Service Bus, è necessario eseguire sempre l'operazione di ricezione in modo asincrono. Sia che la soluzione riceva un messaggio alla volta o recuperi più messaggi, si inizia l'operazione di ricezione utilizzando il metodo BeginReceive con il timeout specificato. Nell'API corrente, il valore di timeout di ricezione massimo è 24 giorni. Mentre il client di messaggistica di Service Bus è in attesa di un nuovo messaggio per conto dell'utente, la soluzione in uso può procedere con l'esecuzione di qualsiasi altro processo di lavoro. Al completamento, verrà notificato il metodo di callback e l'eventuale messaggio ricevuto sarà disponibile per l'elaborazione.

noteNota
Una volta ricevuto un messaggio da una coda o una sottoscrizione, il relativo corpo può essere letto una sola volta. A causa della natura dei protocolli di rete, i flussi di dati dei messaggi non sono sempre "riavvolgibili" perché spesso non supportano un'operazione di ricerca. È consigliabile proteggere i dati dei messaggi inserendoli in un oggetto dopo la chiamata del metodo GetBody(), quindi conservare l'oggetto per tutto il tempo necessario. Il tentativo di richiamare il metodo GetBody() più di una volta non è supportato dall'API di messaggistica negoziata.

Nell'esempio di codice riportato di seguito viene mostrato un esempio di un metodo di programmazione che riceve in modo asincrono il numero specificato di messaggi da una coda di Service Bus:

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. 
                    // This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. 
                            //We can only consume the body once. 
                            // Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. 
                                // This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

In linea con l'indicazione riportata nella sezione precedente, è meglio utilizzare l'integrazione del modello di programmazione asincrona fornita dalla Task Parallel Library per la parallelizzazione dell'operazione di ricezione asincrona dei messaggi.

Mostra:
© 2014 Microsoft