Esporta (0) Stampa
Espandi tutto

Invio di messaggi in modo asincrono

Aggiornamento: marzo 2014

Per sfruttare le funzionalità dalle prestazioni avanzate in Service Bus di Microsoft Azure come ad esempio l'invio in batch lato client, è consigliabile considerare sempre la possibilità di utilizzare il modello di programmazione asincrona durante l'implementazione della soluzione di messaggistica utilizzando l'API di messaggistica negoziata gestita .NET. Il modello di messaggistica asincrona consentirà di compilare soluzioni che permettono in genere di evitare l'overhead di operazioni di I/O come l'invio e la ricezione di messaggi.

Quando si richiama un metodo API in modo asincrono, il controllo viene restituito immediatamente al codice e l'applicazione continua l'esecuzione mentre l'operazione asincrona viene eseguita in modo indipendente. L'applicazione in uso monitora l'operazione asincrona o riceve la notifica mediante un callback quando l'operazione è stata completata. A questo punto, l'applicazione può ottenere ed elaborare i risultati.

È importante notare che quando si richiamano operazioni sincrone, ad esempio i metodi Send o Receive nella classe QueueClient (o altri metodi sincroni forniti dall'API di messaggistica negoziata di Service Bus), internamente il codice API passa attraverso le versioni asincrone dei rispettivi metodi, sebbene con un blocco. Tuttavia, l'utilizzo di versioni sincrone di questi metodi potrebbe non fornire la gamma completa di vantaggi correlati alle prestazioni che ci si aspetta durante la chiamata diretta delle versioni asincrone. Questo è particolarmente evidente quando si inviano o si ricevono più messaggi e si desidera eseguire un'altra elaborazione mentre vengono eseguite le rispettive operazioni di messaggistica.

noteNota
Un oggetto BrokeredMessage rappresenta un messaggio e viene fornito per lo scopo di trasmissione dei dati nella rete. Non appena un oggetto BrokeredMessage viene inviato a una coda o un argomento, viene utilizzato dallo stack di messaggistica sottostante e non può essere riutilizzato per ulteriori operazioni. CIò è dovuto al fatto che una volta letto il corpo del messaggio, il flusso che prevede i dati dei messaggi non può essere riavvolto. È consigliabile conservare i dati di origine utilizzati per creare un'istanza di BrokeredMessage finché non è possibile asserire in modo affidabile l'esito positivo dell'operazione di messaggistica. Se è necessario ripetere un'operazione di messaggistica non riuscita, è consigliare creare una nuova istanza di BrokeredMessage utilizzando tali dati di origine.

Nel frammento di codice seguente viene illustrato come inviare più messaggi in modo asincrono (e anche affidabile) mantenendo al tempo stesso l'ordine di invio dei messaggi:

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

Quando possibile, evitare la parallelizzazione delle operazioni di messaggistica utilizzando gli algoritmi predefiniti di partizionamento dei processi di lavoro e di pianificazione forniti dalla Task Parallel Library (TPL) e da Parallel LINQ (PLINQ). Le funzionalità di base del framework TPL sono più appropriate per l'aggiunta di parallelismo e concorrenza alle applicazioni, principalmente dal punto di vista delle operazioni associate al calcolo. L'utilizzo di TPL "così com'è" per migliorare le prestazioni del codice di I/O come ad esempio le chiamate di rete e le operazioni di messaggistica potrebbe non produrre i miglioramenti previsti. Il modo migliore per sfruttare la TPL per supportare operazioni asincrone è mediante l'utilizzo di modelli TPL avanzati conformi al modello di programmazione asincrona..

Mostra:
© 2014 Microsoft