Esporta (0) Stampa
Espandi tutto

Procedure consigliate per ottimizzare la scalabilità e la convenienza delle soluzioni di messaggistica basata su coda in Windows Azure

Aggiornamento: luglio 2014

Autore: Valery Mizonov

Revisori: Brad Calder, Sidney Higa, Christian Martinez, Steve Marx, Curt Peterson, Paolo Salvatori e Trace Young

In questo articolo vengono indicate informazioni normative e procedure consigliate per compilare soluzioni di messaggistica basata su coda scalabili, economiche ed estremamente efficienti nella piattaforma Windows Azure. Il gruppo di destinatari a cui è rivolto l'articolo include architetti e sviluppatori di soluzioni che progettano e implementano soluzioni basate sul cloud che utilizzano i servizi di archiviazione delle code della piattaforma Windows Azure.

In una tradizionale soluzione di messaggistica basata su coda viene impiegato il concetto di un percorso di archiviazione dei messaggi noto come coda di messaggi, un repository per i dati che verranno inviati o ricevuti da uno o più partecipanti, in genere tramite un meccanismo di comunicazione asincrona.

Lo scambio di dati basato su coda rappresenta la base di un'architettura di messaggistica affidabile ed estremamente scalabile in grado di supportare un'ampia gamma di scenari efficaci in un ambiente DCE (Distributed Computing Environment). Indipendentemente dal fatto che si tratti di messaggistica durevole o di recapito di lavori di volumi elevati, una tecnologia di accodamento di messaggi può garantire funzionalità eccellenti per soddisfare i diversi requisiti per una comunicazione asincrona in scala.

In questo articolo verranno esaminate le procedure con cui gli sviluppatori possono usufruire di particolari modelli di progettazione insieme alle funzionalità fornite dalla piattaforma Windows Azure per compilare soluzioni di messaggistica basata su coda ottimizzate ed economiche. Nell'articolo vengono esaminati in dettaglio gli approcci più comuni all'implementazione di interazioni basate su code in soluzioni Windows Azure e forniti consigli per migliorare le prestazioni, aumentare la scalabilità e ridurre le spese operative.

La discussione sottostante include procedure consigliate, suggerimenti e consigli, laddove appropriato. Nello scenario descritto in questo articolo viene illustrata un'implementazione tecnica basata su un progetto reale di un cliente.

Per un esempio concreto, generalizzeremo uno scenario realistico di un cliente nel modo indicato di seguito.

Un provider di soluzioni SaaS avvia un nuovo sistema di fatturazione implementato come applicazione Windows Azure in grado di soddisfare le esigenze aziendali per l'elaborazione delle transazioni del cliente in scala. Il presupposto principale alla base della soluzione risiede nella capacità di ripartire nel cloud carichi di lavoro che richiedono una notevole quantità di calcoli e sfruttare l'elasticità dell'infrastruttura Windows Azure per eseguire le attività particolarmente dispendiose a livello di calcolo.

L'elemento locale dell'architettura end-to-end consolida e recapita regolarmente volumi elevati di transazioni a un servizio ospitato di Windows Azure durante tutto il giorno. I volumi variano da alcune migliaia a centinaia di migliaia di transazioni a invio, raggiungendo milioni di transazioni al giorno. Si presuppone inoltre che la soluzione debba soddisfare un requisito basato su un contratto di servizio per una latenza massima di elaborazione garantita.

L'architettura della soluzione è fondata sul modello di progettazione MapReduce distribuito e comprende un livello cloud basato su ruolo di lavoro a più istanze che utilizza l'archiviazione delle code di Windows Azure per il recapito di lavoro. I batch di transazione vengono ricevuti dall'istanza del ruolo di lavoro Process Initiator, decomposti (rimossi dai batch) in elementi di lavoro di dimensioni inferiori e accodati in una raccolta di code di Windows Azure ai fini di distribuzione del carico.

L'elaborazione del carico di lavoro viene gestita da più istanze del ruolo di lavoro di elaborazione che recuperano gli elementi di lavoro dalle code per sottoporli a procedure di calcolo. Le istanze di elaborazione impiegano listener di coda multithread per implementare l'elaborazione parallela dei dati per ottenere prestazioni ottimali.

Gli elementi di lavoro elaborati vengono instradati in una coda dedicata da cui vengono quindi rimossi da parte dell'istanza del ruolo di lavoro Process Controller, aggregati e resi persistenti in un archivio dati per il data mining, la generazione di report e l'analisi.

L'architettura della soluzione può essere descritta come segue:

Il diagramma in alto rappresenta un'architettura tipica per la scalabilità orizzontale di carichi di lavoro di calcolo complessi o di grandi dimensioni. Il modello di scambio di messaggi basato su coda adottato in quest'architettura è tipico di molti altri servizi e applicazioni Windows Azure che necessitano di comunicare reciprocamente tramite code. In questo modo è possibile adottare un approccio canonico all'esame di specifici componenti fondamentali coinvolti in uno scambio di messaggi basato su coda.

Una tipica soluzione di messaggistica che prevede lo scambio di dati tra i relativi componenti distribuiti utilizzando code di messaggi include server di pubblicazione che depositano messaggi nelle code e uno o più sottoscrittori a cui sono destinati questi messaggi. Nella maggior parte dei casi, i sottoscrittori, talvolta definiti listener della coda, vengono implementati come processi a thread singoli o multipli, costantemente in esecuzione oppure avviati su richiesta in base a un modello di pianificazione.

A un livello superiore sono disponibili due meccanismi principali di recapito per consentire a un listener della coda di ricevere messaggi archiviati in una coda:

  • Polling (modello basato su pull): un listener monitora una coda controllando a intervalli regolari la presenza di nuovi messaggi. Quando la coda è vuota, il listener continua a eseguirne il polling, con backoff periodico passando a uno stato di sospensione.

  • Attivazione (modello basato su push): un listener effettua la sottoscrizione a un evento attivato (dal server di pubblicazione stesso o da un gestore di servizi di accodamento) ogni volta che viene recapitato un messaggio in una coda. Il listener a sua volta può avviare l'elaborazione dei messaggi senza dover pertanto eseguire il polling della coda per determinare la disponibilità di un nuovo lavoro.

È inoltre importante segnalare che esistono diverse varianti per entrambi i meccanismi. Il polling può ad esempio essere bloccante e non bloccante. Con il blocco viene messa in attesa una richiesta finché in una coda non verrà visualizzato un nuovo messaggio (o si verificherà il timeout), mentre una richiesta non bloccante viene immediatamente completata se non sono presenti elementi in una coda. Con un modello di attivazione, una notifica può essere inserita nei listener della coda per ogni nuovo messaggio, solo quando il primo messaggio raggiunge una coda vuota oppure quando la profondità della coda raggiunge un determinato livello.

noteNota
Le operazioni di rimozione dalla coda supportate dall'API del servizio di accodamento di Windows Azure sono non bloccanti. Ciò significa che il risultato di metodi API quali GetMessage o GetMessages viene immediatamente restituito se non è presente alcun messaggio in una coda. Al contrario, le code di Windows Azure Service Bus offrono operazioni di ricezione che bloccano il thread chiamante finché un messaggio non raggiungerà una coda o non sarà trascorso un periodo di timeout specificato.

L'approccio più comune all'implementazione di listener di code nelle soluzioni Windows Azure moderne può essere riepilogato nel modo seguente:

  1. Un listener viene implementato come componente di applicazione di cui viene creata un'istanza e che viene eseguito nell'ambito di un'istanza del ruolo di lavoro.

  2. Il ciclo di vita del componente di listener della coda viene spesso associato al runtime dell'istanza del ruolo di hosting.

  3. La logica di elaborazione principale è costituita da un ciclo in cui i messaggi vengono rimossi dalla coda e recapitati per l'elaborazione.

  4. Se non viene ricevuto alcun messaggio, il thread di ascolto passa a uno stato di sospensione, la cui durata si basa spesso su un algoritmo di backoff specifico di un'applicazione.

  5. Viene eseguito il ciclo di ricezione e viene effettuato il polling di una coda finché il listener non riceverà la notifica di uscire dal ciclo e si interromperà.

Nel diagramma di flusso indicato di seguito viene illustrata la logica di uso comune nell'implementazione di un listener della coda con un meccanismo di polling in applicazioni Windows Azure:

noteNota
Ai fini di questo articolo non verranno utilizzati modelli di progettazione più complessi, ad esempio quelli che richiedono l'impiego di una gestione centrale delle code (Broker).

L'utilizzo di un tradizionale listener della coda con un meccanismo di polling potrebbe non rappresentare la scelta ottimale in caso di utilizzo di code di Windows Azure, in quanto il modello di determinazione dei prezzi di Windows Azure misura le transazioni di archiviazione in termini di richieste di applicazione eseguite nella coda, indipendentemente dal fatto che sia vuota o meno. Nelle sezioni successive verranno affrontate alcune tecniche per ottimizzare le prestazioni e ridurre il costo le soluzioni di messaggistica basata su coda nella piattaforma Windows Azure.

In questa sezione è necessario esaminare come migliorare gli aspetti di progettazione necessari per conseguire prestazioni migliori, maggiore scalabilità e costi ridotti.

Probabilmente, per stabilire nel modo più semplice se un modello di implementazione possa essere considerato una "soluzione più efficiente", deve soddisfare i seguenti obiettivi:

  • Riduzione delle spese operative grazie alla rimozione di una quantità significativa di transazioni di archiviazione che non derivano alcun lavoro utilizzabile.

  • Eliminazione di latenza eccessiva imposta da un intervallo di polling durante il controllo della presenza di nuovi messaggi in una coda.

  • Scalabilità dinamica verticale in modo che la potenza di elaborazione venga aumentata o diminuita in base a volumi volatili di lavoro.

Il modello di implementazione deve inoltre soddisfare questi obiettivi senza introdurre un livello di complessità superiore ai vantaggi associati.

In fase di valutazione del costo totale di proprietà e dell'utile sugli investimenti per una soluzione distribuita nella piattaforma Windows Azure, il volume delle transazioni di archiviazione è una delle variabili principali nell'equazione del costo totale di proprietà. La riduzione del numero di transazioni rispetto alle code di Windows Azure determina anche la riduzione dei costi operativi, in quanto si riferisce alle soluzioni in esecuzione in Windows Azure.

Nel contesto di una soluzione di messaggistica basata su coda, il volume delle transazioni di archiviazione può essere ridotto mediante una combinazione dei metodi seguenti:

  1. In fase di inserimento di messaggi in una coda, raggruppare i messaggi correlati in un singolo batch di dimensioni superiori, comprimere e archiviare l'immagine compressa in un'Archiviazione BLOB e utilizzare la coda per mantenere un riferimento al BLOB contenente i dati effettivi.

  2. In fase di recupero di messaggi da una coda, raggruppare più messaggi in un'unica transazione di archiviazione. Il metodo GetMessages nell'API del servizio di accodamento consente di rimuovere dalla coda il numero di messaggi specificato in un'unica transazione (vedere la nota di seguito).

  3. In fase di verifica della presenza di elementi di lavoro in una coda, evitare intervalli di polling aggressivi e implementare un ritardo di backoff che aumenta il tempo che intercorre tra le richieste di polling se una coda rimane costantemente vuota.

  4. Riduzione del numero di listener della coda: quando si utilizza un modello basato su pull, utilizzare solo un listener della coda per ogni istanza del ruolo quando una coda è vuota. Per ridurre a zero il numero di listener della coda per ogni istanza del ruolo, utilizzare un meccanismo di notifica per creare un'istanza dei listener della coda quando la coda riceve elementi di lavoro.

  5. Se le code rimangono vuote per la maggior parte del tempo, applicare la scalabilità automatica verticale per ridurre il numero di istanze del ruolo e continuare a monitorare la metrica di sistema rilevante per determinare se e quando l'applicazione dovrà applicare la scalabilità verticale per aumentare il numero di istanze al fine di gestire carichi di lavoro in espansione.

La maggior parte dei consigli sopra indicati possono tradursi in un'implementazione piuttosto generica che gestisce i batch di messaggi e incapsula numerose operazioni sottostanti di gestione dei thread e delle code/Archiviazioni BLOB. Affronteremo in dettaglio l'argomento più avanti in questo articolo.

ImportantImportante
In caso di recupero di messaggi tramite il metodo GetMessages, le dimensioni massime dei batch supportate dall'API del servizio di accodamento in una singola operazione di rimozione dalla coda sono limitate a 32.

In generale, il costo delle transazioni delle code di Windows Azure aumenta in modo lineare proporzionalmente al numero dei client del servizio di accodamento, ad esempio quando viene applicata la scalabilità verticale per aumentare il numero di istanze del ruolo o dei thread di rimozione dalla coda. Per illustrare l'impatto potenziale dei costi di una progettazione di soluzione che non tiene conto dei consigli sopra indicati, verrà fornito un esempio supportato da numeri concreti.

Se l'architetto della soluzione non implementa ottimizzazioni rilevanti, l'architettura del sistema di fatturazione descritta in precedenza determinerà molto probabilmente spese operative eccessive una volta che la soluzione verrà distribuita e sarà in esecuzione nella piattaforma Windows Azure. In questa sezione vengono descritti i motivi alla base di potenziali spese eccessive.

Come accennato nella definizione dello scenario, i dati delle transazioni aziendali vengono recapitati a intervalli regolari. Presupponiamo tuttavia che la soluzione sia impegnata nell'elaborazione del carico di lavoro per appena il 25% del tempo di una giornata lavorativa standard di 8 ore. Ciò si traduce in 6 ore (8 ore * 75%) di "tempo di inattività" in cui non si verificherà alcuna transazione nel sistema. Inoltre, la soluzione non riceverà dati durante le 16 ore giornaliere non lavorative.

Durante il periodo di inattività complessivo di 22 ore, la soluzione tenterà comunque di rimuovere lavori dalla coda, in quanto non è in grado di determinare in modo esplicito il momento in cui verranno recapitati nuovi dati. Durante questo periodo, ogni singolo thread di rimozione dalla coda eseguirà fino a 79.200 transazioni (22 ore * 60 minuti * 60 transazioni/min) su una coda di input, presupponendo un intervallo di polling predefinito di 1 secondo.

Come accennato in precedenza, il modello di determinazione dei prezzi nella piattaforma Windows Azure si basa su singole "transazioni di archiviazione". Una transazione di archiviazione è una richiesta effettuata da un'applicazione utente per aggiungere, leggere, aggiornare o eliminare dati di archiviazione. Al momento della creazione di questo white paper, le transazioni di archiviazione sono fatturate a un costo di $ 0,01 per 10.000 transazioni (escluse eventuali offerte promozionali o accordi speciali).

ImportantImportante
In fase di calcolo del numero di transazioni della coda, tenere presente che l'inserimento di un messaggio in una coda verrà considerato come 1 transazione, mentre l'utilizzo di un messaggio consiste spesso in un processo a 2 fasi, costituito dal recupero seguito da una richiesta di rimozione del messaggio dalla coda. Di conseguenza, un'operazione di rimozione dalla coda completata coinvolgerà 2 transazioni di archiviazione. Tenere presente che, anche se una richiesta di rimozione dalla coda non determina il recupero di dati, verrà comunque conteggiata come una transazione fatturabile.

Le transazioni di archiviazione generate da un singolo thread di rimozione dalla coda nello scenario sopra indicato comporteranno un aumento in una fattura mensile di circa $ 2,38 (79.200/10.000 * $ 0,01 * 30 giorni). In confronto, 200 thread di rimozione dalla coda (o, in alternativa, 1 thread di rimozione in 200 istanze del ruolo di lavoro) determineranno un aumento mensile dei costi di $ 457,20, ovvero i costi accumulati nel momento in cui la soluzione non esegue alcun calcolo, ma si limita a verificare la disponibilità di elementi di lavoro nelle code. L'esempio precedente è astratto, in quanto difficilmente verrebbe implementato un servizio in questo modo. Ecco per quale motivo è importante eseguire le ottimizzazioni descritte di seguito.

Per ottimizzare le prestazioni delle soluzioni di messaggistica di Windows Azure basate su coda, un approccio consiste nell'utilizzare il livello di messaggistica di pubblicazione/sottoscrizione fornito con Windows Azure Service Bus, come descritto in questa sezione.

In questo approccio gli sviluppatori dovranno concentrarsi sulla creazione di una combinazione di notifiche basate su push in tempo reale e polling, consentendo ai listener di effettuare la sottoscrizione a un evento di notifica (trigger) generato in determinate condizioni per indicare che un nuovo carico di lavoro viene inserito in una coda. Questo approccio consente di ottimizzare il ciclo tradizionale di polling della coda con un livello di messaggistica di pubblicazione/sottoscrizione per il recapito di notifiche.

In un sistema distribuito complesso questo approccio necessiterebbe dell'impiego di un "bus di messaggi" o di un "middleware orientato a messaggi" per garantire che le notifiche possano essere inoltrate in modo affidabile a uno o più sottoscrittori a regime di controllo libero. Windows Azure Service Bus rappresenta una scelta naturale per soddisfare i requisiti di messaggistica tra servizi di applicazione distribuiti a regime di controllo libero in esecuzione in Windows Azure e in locale. È inoltre ideale per un'architettura di "bus di messaggi" che consentirà lo scambio di notifiche tra i processi coinvolti nelle comunicazioni basate su coda.

I processi coinvolti in uno scambio di messaggi basato su coda potrebbero utilizzare il modello seguente:

In particolare, e poiché si riferiscono all'interazione tra i server di pubblicazione e i sottoscrittori del servizio di accodamento, gli stessi principi applicabili alla comunicazione tra le istanze del ruolo di Windows Azure soddisferebbero la maggior parte dei requisiti per gli scambi di messaggi di notifiche basate su push. Abbiamo già analizzato queste nozioni fondamentali in Come semplificare e scalare le comunicazioni tra ruoli con Windows Azure Service Bus.

ImportantImportante
L'utilizzo di Windows Azure Service Bus è soggetto a un modello di determinazione dei prezzi che considera il volume di operazioni di messaggistica a fronte di un'entità di messaggistica Service Bus, ad esempio una coda o un argomento.

È pertanto importante eseguire un'analisi del rapporto tra costi e benefici per valutare i pro e i contro relativi all'introduzione di Service Bus in un'architettura specificata. In questo ambito è opportuno valutare se l'introduzione del livello di recapito delle notifiche basato su Service Bus determinerebbe in effetti una riduzione dei costi che giustifichi gli investimenti e ulteriori sforzi di sviluppo.

Per ulteriori informazioni sul modello di determinazione dei prezzi di Service Bus, fare riferimento alle relative sezioni in Domande frequenti sulla piattaforma Windows Azure.

Mentre l'impatto sulla latenza è relativamente semplice da risolvere con un livello di messaggistica di pubblicazione/sottoscrizione, è possibile conseguire un'ulteriore riduzione dei costi mediante la scalabilità dinamica (elastica), come descritto nella sezione successiva.

La piattaforma Windows Azure consente ai clienti di applicare la scalabilità verticale in modo più rapido e più semplice che mai. La capacità di adattarsi a carichi di lavoro volatili e traffico variabile è una delle proposte di valore principali della piattaforma cloud. "Scalabilità" quindi non è più un termine IT dispendioso, ma una funzionalità predefinita che può essere abilitata a livello di codice su richiesta in una soluzione cloud con un'architettura adeguata.

La scalabilità dinamica è la capacità tecnica di una specifica soluzione di adattarsi a carichi di lavoro dinamici aumentando e riducendo la capacità del carico di lavoro e la potenza di elaborazione in fase di esecuzione. La piattaforma Windows Azure supporta in modalità nativa la scalabilità dinamica tramite il provisioning di un'infrastruttura di calcolo distribuita in cui le ore di calcolo possono essere acquisite in base alle esigenze.

È importante distinguere tra i 2 tipi di scalabilità dinamica seguenti nella piattaforma Windows Azure:

  • Con scalabilità dell'istanza del ruolo si fa riferimento all'aggiunta e alla rimozione di ulteriori istanze Web o del ruolo di lavoro per gestire il carico di lavoro temporizzato. Ciò include spesso la modifica del numero di istanze nella configurazione del servizio. L'aumento del numero di istanze determinerà l'avvio da parte del runtime di Windows Azure di nuove istanze, mentre la riduzione del numero di istanze determinerà l'arresto delle istanze in esecuzione.

  • Con scalabilità del processo (thread) si fa riferimento alla gestione di una capacità sufficiente in termini di thread di elaborazione in una determinata istanza del ruolo attraverso l'aumento o la diminuzione del numero di thread a seconda del carico di lavoro corrente.

Per la scalabilità dinamica in una soluzione di messaggistica basata su coda è opportuno considerare i consigli generali indicati di seguito.

  1. Monitoraggio degli indicatori di prestazioni chiave, tra cui utilizzo della CPU, profondità della coda, tempi di risposta e latenza di elaborazione dei messaggi.

  2. Aumento o diminuzione dinamica del numero di istanze del ruolo per gestire i picchi stimabili o imprevedibili nel carico di lavoro.

  3. Espansione e riduzione a livello di codice del numero di thread di elaborazione per adattarsi a condizioni di caricamento variabili gestite da una determinata istanza del ruolo.

  4. Partizionamento ed elaborazione contemporanei di carichi di lavoro con granularità fine mediante la Task Parallel Library in .NET Framework 4.

  5. Gestione di una capacità possibile in soluzioni con carico di lavoro estremamente volatile in previsione di punte improvvise per poterle gestire senza l'overhead correlato alla configurazione di istanze aggiuntive.

L'API di gestione dei servizi consente a un servizio ospitato di Windows Azure di variare il numero delle istanze del ruolo modificando la configurazione di distribuzione in fase di esecuzione.

noteNota
Il numero massimo di istanze di calcolo piccole di Windows Azure (o il numero equivalente di altre istanze di calcolo ridimensionate in termini di numero dei core) in una sottoscrizione tipica è limitato a 20 per impostazione predefinita. Eventuali richieste di aumento della quota devono essere sottoposte al team del supporto di Windows Azure. Per ulteriori informazioni, vedere le domande frequenti della piattaforma Windows Azure

La scalabilità dinamica del numero di istanze del ruolo potrebbe non rappresentare sempre la scelta ideale per la gestione di picchi del carico. Una nuova istanza del ruolo può ad esempio richiedere alcuni secondi per lo spin-up e al momento non sono disponibili metriche del contratto di servizio rispetto alla durata di spin-up. Una soluzione potrebbe invece dover aumentare semplicemente il numero di thread di lavoro per gestire l'aumento del carico di lavoro volatile. In fase di elaborazione del carico di lavoro, la soluzione monitorerà la metrica di caricamento pertinente e determina se è necessario ridurre o aumentare in modo dinamico il numero di processi di lavoro.

ImportantImportante
Attualmente l'obiettivo di scalabilità per una singola coda di Windows Azure è "vincolato" a 500 transazioni al secondo. Se un'applicazione tenta di superare questo obiettivo, ad esempio attraverso l'esecuzione di operazioni della coda da più istanze del ruolo che eseguono centinaia di thread di rimozione, il servizio di archiviazione potrebbe restituire una risposta di server occupato. In questo caso, tramite l'applicazione deve essere implementato un meccanismo di ripetizione tentativi con algoritmo di ritardo esponenziale di backoff. Se tuttavia gli errori HTTP 503 si verificano con regolarità, è consigliabile utilizzare più code e implementare una strategia basata su partizionamento per eseguire la scalabilità tra più code.

Nella maggior parte dei casi, la scalabilità automatica dei processi di lavoro è responsabilità di una singola istanza del ruolo. Al contrario, la scalabilità dell'istanza del ruolo include spesso un elemento centrale dell'architettura della soluzione responsabile del monitoraggio della metrica delle prestazioni e dell'esecuzione di azioni di scalabilità appropriate. Nel diagramma seguente viene illustrato un componente del servizio denominato agente di scalabilità dinamica che raccoglie e analizza la metrica di caricamento per determinare se è necessario effettuare il provisioning di nuove istanze o il decomissioning delle istanze inattive.

È interessante notare che il servizio agente di scalabilità può essere distribuito come ruolo di lavoro in esecuzione in Windows Azure o come servizio in locale. Indipendentemente dalla topologia di distribuzione, il servizio sarà in grado di accedere alle code di Windows Azure.

Per implementare una funzionalità di scalabilità dinamica, considerare l'utilizzo del blocco di applicazioni per la scalabilità automatica di Microsoft Enterprise Library che consente un comportamento di scalabilità automatica nelle soluzioni in esecuzione in Windows Azure. Il blocco di applicazioni per la scalabilità automatica fornisce tutte le funzionalità necessarie per definire e monitorare la scalabilità automatica in un'applicazione Windows Azure.

Dopo aver affrontato l'impatto della latenza, i costi relativi alle transazioni di archiviazione e i requisiti di scalabilità dinamica, è giunto il momento di consolidare questi consigli in un'implementazione tecnica.

Nelle sezioni precedenti sono state esaminate le caratteristiche principali attribuite a un'architettura di messaggistica basata sulle code di archiviazione della coda di Windows Azure progettata in modo adeguato. Sono state osservate le tre aree fondamentali che consentono di ridurre la latenza, ottimizzare i costi di transazione delle archiviazioni e migliorare la risposta a carichi di lavoro dinamici.

In questa sezione verrà fornito un punto di partenza per assistere gli sviluppatori di Windows Azure nell'implementazione di alcuni modelli a cui si fa riferimento in questo white paper a livello di codice.

noteNota
Questa sezione è incentrata sulla compilazione di un listener della coda con scalabilità automatica in grado di supportare modelli basati su pull e push. Per le tecniche avanzate nella scalabilità dinamica a livello di istanza di ruolo, fare riferimento al blocco di applicazioni per la scalabilità automatica di Microsoft Enterprise Library.

Inoltre, per brevità, ci concentreremo solo su alcuni elementi funzionali principali ed eviteremo complessità indesiderate omettendo gran parte del codice dell'infrastruttura di supporto dagli esempi di codice indicati di seguito. Ai fini di chiarimento, è inoltre importante precisare che l'implementazione tecnica descritta di seguito non è l'unica soluzione a un determinato problema, ma rappresenta un punto di partenza da cui gli sviluppatori possono derivare le soluzioni più efficaci.

Da questo punto in poi il white paper analizzerà il codice sorgente necessario per implementare i modelli citati in precedenza.

In primo luogo, definiamo un contratto che verrà implementato da un componente listener della coda ospitato da un ruolo di lavoro e rimane in attesa in una coda di Windows Azure.

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension
{
    /// Starts a multi-threaded queue listener that uses the specified number of dequeue threads.
    void StartListener(int threadCount);

    /// Returns the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo QueryState();

    /// Gets or sets the batch size when performing dequeue operation against a Windows Azure queue.
    int DequeueBatchSize { get; set; }

    /// Gets or sets the default interval that defines how long a queue listener will be idle for between polling a queue.
    TimeSpan DequeueInterval { get; set; }

    /// Defines a callback delegate which will be invoked whenever the queue is empty.
    event WorkCompletedDelegate QueueEmpty;
}

L'evento QueueEmpty dovrà essere utilizzato da un host. Fornisce il meccanismo che consente all'host di controllare il comportamento del listener della coda se la coda è vuota. Il rispettivo delegato di evento viene definito nel modo seguente:

/// <summary>
/// Defines a callback delegate which will be invoked whenever an unit of work has been completed and the worker is
/// requesting further instructions as to next steps.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="idleCount">The value indicating how many times the worker has been idle.</param>
/// <param name="delay">Time interval during which the worker is instructed to sleep before performing next unit of work.</param>
/// <returns>A flag indicating that the worker should stop processing any further units of work and must terminate.</returns>
public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);

La gestione degli elementi della coda è più semplice se un listener è in grado di utilizzare generics anziché classi SDK "bare metal" come CloudQueueMessage. Definiamo pertanto una nuova interfaccia che verrà implementata da un listener della coda in grado di supportare l'accesso alle code basato su generics:

/// <summary>
/// Defines a contract that must be supported by an extension that implements a generics-aware queue listener.
/// </summary>
/// <typeparam name="T">The type of queue item data that will be handled by the queue listener.</typeparam>
public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T>
{
}

È stato inoltre abilitato il listener basato su generics per il push degli elementi della coda a uno o più sottoscrittori attraverso l'implementazione del modello di progettazione Observer, utilizzando l'interfaccia IObservable<T> disponibile in .NET Framework 4.

Manterremo un'unica istanza di un componente implementando l'interfaccia ICloudQueueListenerExtension<T>. È tuttavia necessario riuscire a eseguire più thread di rimozione dalla coda (processi di lavoro o attività per semplificare). Pertanto, è stato aggiunto il supporto alla logica di rimozione dalla coda multithread nel componente del listener della coda. È a questo punto che possiamo usufruire dei benefici della Task Parallel Library (TPL). Il metodo StartListener sarà responsabile dello spin-up del numero specificato di thread di rimozione dalla coda nel modo indicato di seguito.


/// <summary>
/// Starts the specified number of dequeue tasks.
/// </summary>
/// <param name="threadCount">The number of dequeue tasks.</param>
public void StartListener(int threadCount)
{
    Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");

    // The collection of dequeue tasks needs to be reset on each call to this method.
    if (this.dequeueTasks.IsAddingCompleted)
    {
        this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList);
    }

    for (int i = 0; i < threadCount; i++)
    {
        CancellationToken cancellationToken = this.cancellationSignal.Token;
        CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage);

        // Start a new dequeue task and register it in the collection of tasks internally managed by this component.
        this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
    }

    // Mark this collection as not accepting any more additions.
    this.dequeueTasks.CompleteAdding();
}

Il metodo DequeueTaskMain implementa il corpo funzionale di un thread di rimozione dalla coda. Di seguito sono indicate le operazioni principali.

/// <summary>
/// Implements a task performing dequeue operations against a given Windows Azure queue.
/// </summary>
/// <param name="state">An object containing data to be used by the task.</param>
private void DequeueTaskMain(object state)
{
    CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

    int idleStateCount = 0;
    TimeSpan sleepInterval = DequeueInterval;

    try
    {
        // Run a dequeue task until asked to terminate or until a break condition is encountered.
        while (workerState.CanRun)
        {
            try
            {
                var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                int messageCount = 0;

                // Process the dequeued messages concurrently by taking advantage of the above PLINQ query.
                queueMessages.ForAll((message) =>
                {
                    // Reset the count of idle iterations.
                    idleStateCount = 0;

                    // Notify all subscribers that a new message requires processing.
                    workerState.OnNext(message);

                    // Once successful, remove the processed message from the queue.
                    workerState.QueueStorage.Delete<T>(message);

                    // Increment the number of processed messages.
                    messageCount++;
                });

                // Check whether or not we have done any work during this iteration.
                if (0 == messageCount)
                {
                    // Increment the number of iterations when we were not doing any work (e.g. no messages were dequeued).
                    idleStateCount++;

                    // Call the user-defined delegate informing that no more work is available.
                    if (QueueEmpty != null)
                    {
                        // Check if the user-defined delegate has requested a halt to any further work processing.
                        if (QueueEmpty(this, idleStateCount, out sleepInterval))
                        {
                            // Terminate the dequeue loop if user-defined delegate advised us to do so.
                            break;
                        }
                    }

                    // Enter the idle state for the defined interval.
                    Thread.Sleep(sleepInterval);
                }
            }
            catch (Exception ex)
            {
                if (ex is OperationCanceledException)
                {
                    throw;
                }
                else
                {
                    // Offload the responsibility for handling or reporting the error to the external object.
                    workerState.OnError(ex);

                    // Sleep for the specified interval to avoid a flood of errors.
                    Thread.Sleep(sleepInterval);
                }
            }
        }
    }
    finally
    {
        workerState.OnCompleted();
    }
}

È opportuno segnalare alcuni punti in merito all'implementazione del metodo DequeueTaskMain.

In primo luogo, possiamo usufruire dei benefici di Parallel LINQ (PLINQ) in fase di recapito di messaggi per l'elaborazione. In questa fase il vantaggio principale di PLINQ consiste nella possibilità di velocizzare la gestione dei messaggi eseguendo il delegato di query su thread di lavoro distinti in più processori in parallelo, laddove possibile.

noteNota
Dal momento che la parallelizzazione delle query viene gestita da PLINQ a livello interno, non sussiste alcuna garanzia in merito al fatto che PLINQ utilizzerà più di un solo core per la parallelizzazione del lavoro. PLINQ può eseguire una query in sequenza se determina che l'overhead di parallelizzazione rallenterà la query. Per sfruttare al meglio PLINQ, il lavoro totale nella query dovrà essere sufficientemente esteso da poter trarre vantaggio dall'overhead della pianificazione del lavoro nel pool di thread.

In secondo luogo, non verrà recuperato un solo messaggio alla volta, ma verrà richiesto all'API del servizio di accodamento di recuperare un numero specifico di messaggi da una coda. Questa richiesta viene gestita dal parametro DequeueBatchSize passato al metodo Get<T>. Una volta immesso il livello di astrazione dell'archiviazione implementato nell'ambito della soluzione generale, il parametro verrà trasmesso al metodo dell'API del servizio di accodamento. Verrà inoltre eseguito un controllo di sicurezza per verificare che le dimensioni batch non superino il limite massimo supportato dalle API. L'implementazione viene effettuata nel modo indicato di seguito.

/// This class provides reliable generics-aware access to the Windows Azure Queue storage.
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    /// The maximum batch size supported by Queue Service API in a single Get operation.
    private const int MaxDequeueMessageCount = 32;

    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
        Guard.ArgumentNotZeroOrNegativeValue(count, "count");

        try
        {
            var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

            IEnumerable<CloudQueueMessage> queueMessages = this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() =>
            {
                return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout);
            });

            // ... There is more code after this point ...

Infine, l'attività di rimozione dalla coda non verrà eseguita indefinitamente. È stato eseguito il provisioning di un checkpoint esplicito implementato come evento QueueEmpty generato ogni volta che una coda è vuota. A questo punto, è stato consultato un gestore eventi QueueEmpty per determinare se sia consentito terminare l'attività di rimozione dalla coda in corso. Un'implementazione del gestore eventi QueueEmpty progettata in modo adeguato consente il supporto della funzionalità di scalabilità automatica verticale per ridurre il numero di attività di rimozione dalla coda, come illustrato nella sezione seguente.

Il gestore eventi QueueEmpty presenta due finalità. In primo luogo, è responsabile dell'invio di commenti e suggerimenti all'attività di rimozione dalla coda di origine, con l'istruzione di passare a uno stato di sospensione per un determinato intervallo di tempo (come definito nel parametro di output delay nel delegato di evento). In seconda istanza, indica all'attività di rimozione dalla coda se debba arrestarsi normalmente (come indicato dal parametro Boolean restituito).

L'implementazione seguente del gestore eventi QueueEmpty risolve le due problematiche evidenziate in precedenza in questo white paper. Calcola un intervallo di backoff esponenziale casuale e indica all'attività di rimozione dalla coda di aumentare in modo esponenziale il ritardo tra le richieste di polling della coda. Il ritardo di backoff non supererà 1 secondo come configurato nella soluzione, poiché non è necessario disporre di un ritardo prolungato tra le richieste di polling se la riduzione automatica viene implementata in modo adeguato. Esegue inoltre una query sullo stato del listener della coda per determinare il numero di attività di rimozione dalla coda attive. Nel caso in cui il numero sia superiore a 1, il gestore eventi indicherà all'attività di rimozione dalla coda di origine di completare il ciclo di polling, a condizione che anche l'intervallo di backoff abbia raggiunto il limite massimo specificato. In caso contrario, l'attività di rimozione dalla coda non verrà interrotta e manterrà un solo thread di polling in esecuzione alla volta per ogni singola istanza del listener della coda. Questo approccio consente di ridurre il numero di transazioni di archiviazione e quindi diminuire i costi di transazione come illustrato in precedenza.

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
    ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

    // Find out which extension is responsible for retrieving the worker role configuration settings.
    IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>();

    // Get the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo queueServiceState = queueService.QueryState();

    // Set up the initial parameters, read configuration settings.
    int deltaBackoffMs = 100;
    int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds);
    int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds);

    // Calculate a new sleep interval value that will follow a random exponential back-off curve.
    int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
    int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);

    // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
    delay = TimeSpan.FromMilliseconds((double)interval);

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself
    // unless this is a last deqeueue task. If so, we are not going to keep it running and continue polling the queue.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1;
}

A un livello superiore, la funzionalità di "scalabilità verticale per la riduzione delle attività di rimozione dalla coda" può essere illustrata nel modo indicato di seguito.

  1. Ogni volta che nella coda è presente un elemento, le attività di rimozione dalla coda garantiranno che il carico di lavoro venga elaborato il prima possibile. Non si verificherà alcun ritardo tra le richieste per rimuovere messaggi da una coda.

  2. Nel momento in cui la coda di origine è vuota, ogni attività di rimozione dalla coda genererà un evento QueueEmpty.

  3. Il gestore eventi QueueEmpty calcolerà un ritardo di backoff esponenziale casuale e indicherà all'attività di rimozione dalla coda di sospendere l'attività per un determinato intervallo di tempo.

  4. Le attività di rimozione dalla coda continueranno a eseguire il polling della coda di origine a intervalli calcolati finché la durata di inattività non supererà il limite massimo consentito.

  5. Al raggiungimento dell'intervallo di inattività massimo e a condizione che la coda di origine sia ancora vuota, tutte le attività di rimozione dalla coda attive si arresteranno normalmente: questa operazione non avverrà contemporaneamente per tutti gli elementi, dal momento che il backoff delle attività di rimozione dalla coda si verifica in punti diversi all'interno dell'algoritmo di backoff.

  6. In un determinato momento sarà presente una sola attività di rimozione dalla coda attiva in attesa di lavoro. Di conseguenza non si verificherà alcuna transazione di polling attiva rispetto a una coda, se non dalla singola attività specifica.

Per approfondire il discorso sul processo di raccolta delle caratteristiche di caricamento temporizzate, è opportuno citare gli elementi di codice sorgente correlati. Innanzitutto, esiste una struttura che include la metrica rilevante per misurare il risultato del caricamento applicato alla soluzione. Per semplificare è stato incluso un subset ridotto di metriche che verrà utilizzato più avanti nel codice di esempio.

/// Implements a structure containing point-in-time load characteristics for a given queue listener.
public struct CloudQueueListenerInfo
{
    /// Returns the approximate number of items in the Windows Azure queue.
    public int CurrentQueueDepth { get; internal set; }

    /// Returns the number of dequeue tasks that are actively performing work or waiting for work.
    public int ActiveDequeueTasks { get; internal set; }

    /// Returns the maximum number of dequeue tasks that were active at a time.
    public int TotalDequeueTasks { get; internal set; }
}

In secondo luogo, è presente un metodo implementato da un listener della coda che restituisce la metrica di caricamento nel modo indicato nell'esempio seguente:

/// Returns the current state of the queue listener to determine point-in-time load characteristics.
public CloudQueueListenerInfo QueryState()
{
    return new CloudQueueListenerInfo()
    {
        CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName),
        ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(),
        TotalDequeueTasks = this.dequeueTasks.Count
    };
}

Nella sezione precedente è stata introdotta la possibilità di diminuire il numero di attività di rimozione dalla coda attive in una singola istanza per ridurre l'impatto delle transazioni inattive sui costi delle operazioni di archiviazione. In questa sezione verrà illustrato in dettaglio un esempio inverso in cui verrà implementata la funzionalità di "scalabilità automatica verticale" per ripristinare i valori originali della potenza di elaborazione laddove necessario.

In primo luogo verrà definito un delegato di evento che consentirà di tenere traccia delle transizioni di stato da una coda vuota a una coda non vuota al fine di attivare le azioni rilevanti:

/// <summary>
/// Defines a callback delegate which will be invoked whenever new work arrived to a queue while the queue listener was idle.
/// </summary>
/// <param name="sender">The source of the event.</param>
public delegate void WorkDetectedDelegate(object sender);

Verrà quindi estesa la definizione originale dell'interfaccia ICloudQueueServiceWorkerRoleExtension per includere un nuovo evento che verrà generato ogni volta che un listener della coda rileverà nuovi elementi di lavoro, essenzialmente quando la profondità della coda passa da zero a un valore positivo qualsiasi:

public interface ICloudQueueServiceWorkerRoleExtension
{
    // ... The other interface members were omitted for brevity. See the previous code snippets for reference ...

    // Defines a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    event WorkDetectedDelegate QueueWorkDetected;
}

Determinare inoltre il punto esatto nel codice del listener della coda in cui verrà generato tale evento. L'evento QueueWorkDetected verrà generato nel ciclo di rimozione dalla coda implementato nel metodo DequeueTaskMain che dovrà essere esteso nel modo seguente:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // An instance of the delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    public event WorkDetectedDelegate QueueWorkDetected;

    private void DequeueTaskMain(object state)
    {
        CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

        int idleStateCount = 0;
        TimeSpan sleepInterval = DequeueInterval;

        try
        {
            // Run a dequeue task until asked to terminate or until a break condition is encountered.
            while (workerState.CanRun)
            {
                try
                {
                    var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                    int messageCount = 0;

                    // Check whether or not work items arrived to a queue while the listener was idle.
                    if (idleStateCount > 0 && queueMessages.Count() > 0)
                    {
                        if (QueueWorkDetected != null)
                        {
                            QueueWorkDetected(this);
                        }
                    }

                    // ... The rest of the code was omitted for brevity. See the previous code snippets for reference ...

Nell'ultimo passaggio verrà fornito un gestore per l'evento QueueWorkDetected. L'implementazione di questo gestore eventi verrà fornito da un componente che crea un'istanza e ospita il listener della coda. In questo caso si tratta di un ruolo di lavoro. Il codice responsabile della creazione dell'istanza e dell'implementazione di un gestore eventi è costituito dagli elementi indicati di seguito:

public class WorkItemProcessorWorkerRole : RoleEntryPoint
{
    // Called by Windows Azure to initialize the role instance.
    public override sealed bool OnStart()
    {
        // ... There is some code before this point ...

        // Instantiate a queue listener for the input queue.
        var inputQueueListener = new CloudQueueListenerExtension<XDocument>(inputQueueLocation);

        // Configure the input queue listener.
        inputQueueListener.QueueEmpty += HandleQueueEmptyEvent;
        inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent;
        inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize;
        inputQueueListener.DequeueInterval = configSettingsExtension.Settings.MinimumIdleInterval;

        // ... There is more code after this point ...
    }

    // Implements a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    private void HandleQueueWorkDetectedEvent(object sender)
    {
        // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
        ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

        // Get the current state of the queue listener to determine point-in-time load characteristics.
        CloudQueueListenerInfo queueServiceState = queueService.QueryState();

        // Determine the number of queue tasks that would be required to handle the workload in a queue given its current depth.
        int dequeueTaskCount = GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth);

        // If the dequeue task count is less than computed above, start as many dequeue tasks as needed.
        if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount)
        {
            // Start the required number of dequeue tasks.
            queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks);
        }
    }       // ... There is more code after this point ...

Alla luce dell'esempio precedente è opportuno osservare in dettaglio il metodo GetOptimalDequeueTaskCount. Questo metodo consente di calcolare il numero di attività di rimozione dalla coda ritenute ottimali per la gestione del carico di lavoro in una coda. Se richiamato, questo metodo deve determinare (tramite qualsiasi meccanismo decisionale appropriato) il numero di "cavalli vapore" necessari al listener della coda per poter elaborare il volume di lavoro in attesa o previsto in una determinata coda.

Lo sviluppatore potrebbe ad esempio adottare un approccio semplicistico e includere un set di regole statiche direttamente nel metodo GetOptimalDequeueTaskCount. Utilizzando le caratteristiche note di scalabilità e velocità effettiva dell'infrastruttura di accodamento, la latenza media di elaborazione, le dimensioni del payload e altri input rilevanti, il set di regole potrebbe optare per un numero ottimale di attività di rimozione dalla coda

Nell'esempio seguente verrà utilizzata una tecnica volutamente semplificata per determinare il numero di attività di rimozione dalla coda:

/// <summary>
/// Returns the number of queue tasks that would be required to handle the workload in a queue given its current depth.
/// </summary>
/// <param name="currentDepth">The approximate number of items in the queue.</param>
/// <returns>The optimal number of dequeue tasks.</returns>
private int GetOptimalDequeueTaskCount(int currentDepth)
{
    if (currentDepth < 100) return 10;
    if (currentDepth >= 100 && currentDepth < 1000) return 50;
    if (currentDepth >= 1000) return 100;

    // Return the minimum acceptable count.
    return 1;
}

È opportuno ripetere che il codice di esempio precedente non rappresenta un approccio valido in qualsiasi situazione. Una soluzione più idonea consiste nel richiamare una regola configurabile e gestibile a livello esterno che esegue i calcoli necessari.

A questo punto si disporrà di un prototipo funzionante di un listener della coda in grado di eseguire la scalabilità automatica verticale in base a carichi di lavoro dinamici. Come tocco finale potrebbe essere necessario aggiungere la capacità di adattamento a carichi variabili in fase di elaborazione. Questa funzionalità può essere aggiunta applicando lo stesso modello seguito durante l'aggiunta di supporto all'evento QueueWorkDetected.

Esaminiamo ora un'altra importante ottimizzazione che consentirà di ridurre la latenza nei listener della coda.

In questa sezione verrà ottimizzata l'implementazione precedente di un listener della coda con un meccanismo di notifica basato su push compilato in base alla funzionalità multicast unidirezionale di Service Bus. Il meccanismo di notifica è responsabile dell'attivazione di un evento che indica al listener della coda di eseguire il lavoro di rimozione dalla coda. Questo approccio consente di evitare l'esecuzione del polling della coda per controllare la presenza di nuovi messaggi e quindi eliminare la latenza associata.

In primo luogo verrà definito un evento di attivazione che verrà ricevuto dal listener della coda nel caso in cui venga inserito in una coda un nuovo carico di lavoro:

/// Implements a trigger event indicating that a new workload was put in a queue.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class CloudQueueWorkDetectedTriggerEvent
{
    /// Returns the name of the storage account on which the queue is located.
    [DataMember]
    public string StorageAccount { get; private set; }

    /// Returns a name of the queue where the payload was put.
    [DataMember]
    public string QueueName { get; private set; }

    /// Returns a size of the queue's payload (e.g. the size of a message or the number of messages in a batch).
    [DataMember]
    public long PayloadSize { get; private set; }

    // ... The constructor was omitted for brevity ...
}

Successivamente si consentirà alle implementazioni del listener della cosa di fungere da sottoscrittori per la ricezione di un evento di attivazione. Il primo passaggio consiste nel definire un listener della coda come observer per l'evento CloudQueueWorkDetectedTriggerEvent:

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent>
{
    // ... The body is omitted as it was supplied in previous examples ...
}

Il secondo passaggio consiste nell'implementare il metodo OnNext definito nell'interfaccia IObserver<T>. Questo metodo viene chiamato dal provider per comunicare all'observer la presenza di un nuovo evento:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // ... There is some code before this point ...

    /// <summary>
    /// Gets called by the provider to notify this queue listener about a new trigger event.
    /// </summary>
    /// <param name="e">The trigger event indicating that a new payload was put in a queue.</param>
    public void OnNext(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Make sure the trigger event is for the queue managed by this listener, otherwise ignore.
        if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName)
        {
            if (QueueWorkDetected != null)
            {
                 QueueWorkDetected(this);
            }
        }
    }

    // ... There is more code after this point ...
}

Come indicato nell'esempio di cui sopra, è stato espressamente richiamato lo stesso delegato di evento utilizzato nei passaggi precedenti. Il gestore eventi QueueWorkDetected fornisce già la logica di applicazione necessaria per la creazione di un'istanza del numero ottimale di attività di rimozione dalla coda. Lo stesso gestore eventi verrà pertanto riutilizzato in fase di gestione della notifica CloudQueueWorkDetectedTriggerEvent.

Come indicato nelle sezioni precedenti, non è necessario gestire un'attività di rimozione dalla coda costantemente in esecuzione quando viene impiegata una notifica basata su push. È pertanto possibile ridurre a zero il numero di attività della coda per ogni istanza di listener della coda e utilizzare un meccanismo di notifica per creare un'istanza delle attività di rimozione dalla coda quando la coda riceve elementi di lavoro. Per accertarsi che non siano in esecuzione attività di rimozione dalla coda attive, è necessario applicare la seguente modifica diretta nel gestore eventi QueueEmpty:

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // ... There is some code before this point ...

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs;
}

In breve, non verrà più rilevata la presenza o meno di una singola attività di rimozione dalla coda attiva. Il risultato del gestore eventi di QueueEmpty modificato tiene esclusivamente conto del superamento dell'intervallo massimo di inattività in base al quale tutte le attività di rimozione dalla coda attive verranno arrestate.

Per ricevere le notifiche CloudQueueWorkDetectedTriggerEvent, utilizzare il modello di pubblicazione/sottoscrizione implementato come messaggistica a regime di controllo libero tra istanze di ruolo di Windows Azure. In pratica verrà utilizzato lo stesso livello di comunicazione tra ruoli e gli eventi in ingresso verranno gestiti come segue:

public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension
{
    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    public void OnNext(InterRoleCommunicationEvent e)
    {
        if (this.owner != null && e.Payload != null)
        {
            // ... There is some code before this point ...

            if (e.Payload is CloudQueueWorkDetectedTriggerEvent)
            {
                HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent);
                return;
            }

            // ... There is more code after this point ...
        }
    }

    private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Enumerate through registered queue listeners and relay the trigger event to them.
        foreach (var queueService in this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>())
        {
            // Pass the trigger event to a given queue listener.
            queueService.OnNext(e);
        }
    }
}

Il multicasting di un evento di attivazione definito nella classe CloudQueueWorkDetectedTriggerEvent ricade su un server di pubblicazione, ovvero il componente che inserisce elementi di lavoro in una coda. Questo evento può essere attivato precedentemente all'accodamento del primo elemento di lavoro o successivamente all'inserimento dell'ultimo elemento in una coda. Nell'esempio seguente verrà pubblicato un evento di attivazione al termine dell'inserimento di elementi di lavoro nella coda di input:

public class ProcessInitiatorWorkerRole : RoleEntryPoint
{
    // The instance of the role extension which provides an interface to the inter-role communication service.
    private volatile IInterRoleCommunicationExtension interRoleCommunicator;

    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    private void HandleWorkload()
    {
        // Step 1: Receive compute-intensive workload.
        // ... (code was omitted for brevity) ...

        // Step 2: Enqueue work items into the input queue.
        // ... (code was omitted for brevity) ...

        // Step 3: Notify the respective queue listeners that they should expect work to arrive.
        // Create a trigger event referencing the queue into which we have just put work items.
        var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue");

        // Package the trigger into an inter-role communication event.
        var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger);

        // Publish inter-role communication event via the Service Bus one-way multicast.
        interRoleCommunicator.Publish(interRoleEvent);
    }
}

Una volta compilato un listener della coda in grado di supportare il mutithreading, la scalabilità automatica e le notifiche basate su push, consolidare tutti i consigli relativi alla progettazione di soluzioni di messaggistica basate su coda nella piattaforma Windows Azure.

Per ottimizzare l'efficienza e la convenienza delle soluzioni di messaggistica basate su coda in esecuzione nella piattaforma Windows Azure, gli architetti e gli sviluppatori di soluzioni devono tenere presenti le indicazioni seguenti:

Gli architetti di soluzioni dovranno:

  • Eseguire il provisioning di un'architettura di messaggistica basata su coda che utilizza il servizio di archiviazione delle code di Windows Azure per la comunicazione asincrona su ampia scala tra i livelli e i servizi nelle soluzioni ibride o basate sul cloud.

  • Consigliare un'architettura di accodamento partizionata per una scalabilità superiore a 500 transazioni al secondo.

  • Comprendere le nozioni fondamentali sul modello di determinazione dei prezzi di Windows Azure e ottimizzare la soluzione per ridurre i costi di transazione tramite una serie di procedure consigliate e modelli di progettazione.

  • Considerare i requisiti di scalabilità dinamica effettuando il provisioning di un'architettura adattabile a carichi di lavoro volatili e dinamici.

  • Utilizzare le tecniche e gli approcci appropriati di scalabilità automatica per espandere e ridurre in modo elastico la potenza di calcolo al fine di ottimizzare ulteriormente le spese operative.

  • Valutare il rapporto tra costi/benefici alla base di una riduzione della latenza attraverso l'impiego di Windows Azure Service Bus per il recapito di notifiche basate su push in tempo reale.

Gli sviluppatori dovranno:

  • Progettare una soluzione di messaggistica che impieghi l'invio in batch durante l'archiviazione e il recupero di dati dalle code di Windows Azure.

  • Implementare un servizio di listener della coda efficace affinché venga eseguito il polling delle code da un massimo di un thread di rimozione dalla coda se vuote.

  • Applicare scalabilità dinamica verticale per ridurre il numero di istanze di ruoli di lavoro se le code rimangono vuote per un periodo prolungato.

  • Implementare un algoritmo di backoff esponenziale casuale specifico dell'applicazione per ridurre l'effetto del polling delle code inattive sui costi delle transazioni di archiviazione.

  • Adottare le tecniche appropriate che impediranno di superare gli obiettivi di scalabilità per una singola coda in fase di implementazione di consumer e server di pubblicazione di code a più istanze multithread.

  • Impiegare criteri di tentativi efficaci in grado di gestire varie condizioni temporanee quando si pubblicano e utilizzano i dati da code di Windows Azure.

  • Utilizzare la funzionalità di gestione degli eventi unidirezionale fornita da Windows Azure Service Bus per supportare le notifiche basate su push al fine di ridurre la latenza e migliorare le prestazioni della soluzione di messaggistica basata su coda.

  • Esplorare le nuove caratteristiche di .NET Framework 4, tra cui TPL, PLINQ e il modello Observer, per ottimizzare il grado di parallelismo, migliorare la concorrenza e semplificare la progettazione di servizi multithread.

Il codice di esempio è disponibile per il download da MSDN Code Gallery. Include inoltre tutti i componenti dell'infrastruttura necessari, tra cui il livello di astrazione basato su generics per il servizio di accodamento di Windows Azure, non disponibili nei frammenti di codice precedenti. Tutti i file di codice sorgente sono regolati da Microsoft Public License, come illustrato nelle note legali corrispondenti.

Per ulteriori informazioni sull'argomento descritto in questo white paper, fare riferimento ai collegamenti seguenti:

Microsoft sta conducendo un sondaggio in linea per comprendere l'opinione degli utenti in merito al sito Web di MSDN. Se si sceglie di partecipare, quando si lascia il sito Web di MSDN verrà visualizzato il sondaggio in linea.

Si desidera partecipare?
Mostra:
© 2014 Microsoft