Creazione di adattatori di input e di output

In questo argomento vengono fornite le informazioni generali necessarie per creare adattatori di input e di output per l'applicazione CEP (Complex Event Processing, elaborazione di eventi complessi) utilizzando la piattaforma StreamInsight. Gli adattatori sono convertitori software tramite cui vengono recapitati gli eventi a o da un server StreamInsight.

Informazioni sul controllo e sul flusso degli eventi

Quando si creano adattatori, è importante comprendere il flusso degli eventi attraverso il server StreamInsight e il modo in cui gli adattatori di input e di output controllano questo flusso. Come mostrato nell'illustrazione seguente, il flusso degli eventi dall'origine, tramite la query in esecuzione, e al sink è unidirezionale. Gli eventi vengono letti da un'origine dall'adattatore di input mediante il quale vengono recapitati alla query. Gli eventi di input, o i nuovi eventi risultanti dall'elaborazione degli eventi di input, vengono indirizzati da un operatore al successivo nella query. La query recapita gli eventi elaborati all'adattatore di output, che li recapita nel sink. Nella figura viene illustrato uno scenario in cui una query di StreamInsight è associata a due istanze dell'adattatore di input, a1 e a2, e a un'istanza dell'adattatore di output, a4.

Flusso di eventi dall'adattatore di input all'adattatore di output

Mentre il flusso degli eventi è unidirezionale dall'origine al sink, il controllo dell'esecuzione e del flusso per il recupero e il trasferimento degli eventi in corrispondenza di alcuni punti di interazione tra i componenti può essere bidirezionale. Questi punti di interazione sono indicati nella figura come READ, ENQUEUE, DEQUEUE e WRITE.

L'implementazione dell'adattatore di input deve eseguire l'operazione READ utilizzando meccanismi di accesso specifici del dispositivo di origine (ad esempio un file o un database) e deve eseguire l'operazione ENQUEUE utilizzando le API dell'adattatore. Analogamente, l'implementazione dell'adattatore di output deve eseguire l'operazione WRITE utilizzando meccanismi di accesso specifici del dispositivo sink e l'operazione DEQUEUE utilizzando le API dell'adattatore. È necessario implementare le operazioni ENQUEUE e DEQUEUE in base a un modello di progettazione specificato da un diagramma delle transizioni di stato dell'adattatore, descritto più avanti in questo argomento.

Dal punto di vista del controllo del flusso di eventi, è possibile immaginare che venga effettuato il push degli eventi da un provider a un consumer (indicato dalle frecce rivolte da sinistra a destra) o il pull da un provider da parte del consumer (indicato dalle frecce ricurve). In corrispondenza dei punti di interazione READ e WRITE, l'implementazione dell'adattatore può adottare l'approccio push o pull per il controllo del flusso di eventi. Alcuni dei fattori da considerare per questa interazione sono le frequenze degli eventi supportate dall'origine o dal sink, la capacità dell'adattatore di limitare l'origine o il sink ed eventuali funzionalità di buffering che è possibile implementare.

Per dispositivi di origine che eseguono operazioni di pump degli eventi con una latenza molto bassa e che sono difficili da limitare, un approccio tipico consiste nell'implementare un adattatore nel punto in cui il dispositivo di origine effettua il push degli eventi nell'adattatore. Esempi di tali dispositivi sono costituiti da sensori (eventi guidati dal computer), ticker plant e porte di rete. Per i dispositivi con latenze più elevate (file, database), considerare un'implementazione in cui l'adattatore effettua il pull dei dati dall'origine. Analogamente, sul lato di output, è possibile implementare un adattatore di output per un dispositivo in grado di accettare eventi con velocità effettiva molto elevata, per effettuare il push degli eventi nel dispositivo. Per i dispositivi di output più lenti è possibile adottare un approccio in cui il dispositivo esegue il polling dell'adattatore quando è pronto per utilizzare gli eventi.

In corrispondenza del punto di interazione ENQUEUE, il server StreamInsight supporta un modello push. Questo significa che il modello di progettazione dell'adattatore consente di accodare la quantità di eventi che il motore è in grado di utilizzare in qualsiasi momento. In corrispondenza del punto di interazione DEQUEUE, il server StreamInsight supporta un modello pull. Questo significa che il modello di progettazione dell'adattatore prevede che venga effettuato il pull degli eventi dal server alla velocità supportata dal motore.

Basandosi su questo presupposto, i criteri di limitazione per il server StreamInsight sono molto lineari. Presupponendo una semplice query pass-through senza operazioni di blocco, la velocità con cui un server StreamInsight può utilizzare gli eventi da un adattatore di input in corrispondenza del punto di interazione ENQUEUE è limitata solo dalla velocità con cui l'adattatore di output può utilizzare gli eventi dal server in corrispondenza del punto di interazione DEQUEUE. La misura in cui il server StreamInsight effettua il push di nuovo nell'adattatore di input durante l'operazione ENQUEUE è determinata dalla velocità con cui una query è in grado di rilasciare l'output e dalla velocità con cui l'adattatore di output è in grado di utilizzare questo output. StreamInsight offre un set completo di viste diagnostiche che consentono di misurare le frequenze degli eventi in corrispondenza di ognuno di questi punti di interazione. Per ulteriori informazioni, vedere Monitoraggio delle query e del server StreamInsight.

Attività di sviluppo dell'adattatore

Per sviluppare l'adattatore, utilizzare il seguente elenco di controllo.

  • Determinare il tipo di adattatore (di input o di output) desiderato.

    Un adattatore di input legge gli eventi in entrata nel formato in cui vengono forniti e trasforma questi dati in un formato che possa essere utilizzato dal server StreamInsight.

    Un adattatore di output riceve gli eventi elaborati dal server StreamInsight, trasforma gli eventi in un formato previsto dal dispositivo di output e trasmette i dati a tale dispositivo.

  • Determinare il tipo di evento.

    Per un adattatore di input, definire il tipo di evento che descrive il payload di eventi fornito dall'origine. Per un adattatore di output, specificare il tipo di evento che descrive il payload di eventi utilizzato dal sink. Per ulteriori informazioni sui payload di eventi, vedere Concetti relativi al server StreamInsight.

    Specificare e compilare un adattatore tipizzato per un'origine o un sink che produce o utilizza sempre eventi con un formato di payload fisso, in cui il numero di campi e i relativi tipi sono noti in anticipo. Il vantaggio principale dell'adattatore tipizzato consiste nel fatto che l'implementazione della creazione di eventi per l'accodamento nel server StreamInsight è relativamente semplice. Poiché i tipi di campo sono già noti, è possibile utilizzare IntelliSense in Visual Studio (o una caratteristica equivalente in un altro ambiente di sviluppo integrato) per popolare i campi.

    Specificare e compilare un adattatore non tipizzato se l'origine o il sink produce o utilizza formati di payload diversi. Il vantaggio principale di un adattatore non tipizzato è costituito dalla flessibilità fornita, che consente di specificare il tipo di evento al momento dell'associazione della query, anziché legare l'implementazione dell'adattatore a un tipo di evento specifico. L'implementazione dell'adattatore non tipizzato è maggiormente complicata rispetto a quella dell'adattatore tipizzato. L'adattatore di input non tipizzato deve essere scritto in modo che sia possibile determinare il tipo di ogni campo dai parametri di configurazione forniti durante l'associazione della query, popolare i campi uno alla volta e quindi accodare l'evento. Analogamente, l'adattatore di output non tipizzato deve essere in grado di recuperare il risultato dell'elaborazione della query da un evento rimosso dalla coda, in base alle informazioni di configurazione fornite nell'output.

    È importante notare che un'istanza di un adattatore, tipizzato o non tipizzato, associata alla query, trasmette sempre eventi contenenti payload di un tipo specifico. Per ulteriori informazioni, vedere Creazione di tipi di evento.

  • Determinare il modello di eventi.

    Determinare il modello di eventi per gli eventi di input e output. StreamInsight supporta tre modelli di eventi: punto, intervallo e limite. Se l'origine fornisce eventi di un modello di eventi fisso, è possibile progettare un adattatore di input solo per tale modello di eventi. Analogamente, se per il sink sono necessari eventi di un particolare modello, è possibile progettare un adattatore di output solo per tale modello di eventi. Per la maggior parte delle applicazioni potrebbero tuttavia essere necessari tutti i modelli di eventi per un particolare tipo di evento È consigliabile compilare un adattatore tipizzato o non tipizzato per ognuno dei modelli di eventi. Per ulteriori informazioni sui modelli di eventi, vedere Concetti relativi al server StreamInsight.

    Le classi AdapterFactory di input e output consentono di creare un pacchetto di questi adattatori. È possibile creare un'istanza dell'adattatore appropriato al momento dell'associazione della query in base ai parametri di configurazione.

  • Scegliere la classe di base dell'adattatore corrispondente.

    In base al modello e al tipo di evento, selezionare la classe di base dell'adattatore appropriata. La nomenclatura della classe è basata sul modello [Typed][Point | Interval | Edge][Input | Output]. Per gli adattatori non tipizzati non è presente il prefisso Typed.

    Tipo di adattatore

    Classe di base dell'adattatore di input

    Classe di base dell'adattatore di output

    Punto tipizzato

    TypedPointInputAdapter

    TypedPointOutputAdapter

    Punto non tipizzato

    PointInputAdapter

    PointOutputAdapter

    Intervallo tipizzato

    TypedIntervalInputAdapter

    TypedIntervalOutputAdapter

    Intervallo non tipizzato

    IntervalInputAdapter

    IntervalOutputAdapter

    Limite tipizzato

    TypedEdgeInputAdapter

    TypedEdgeOutputAdapter

    Limite non tipizzato

    EdgeInputAdapter

    EdgeOutputAdapter

    Per ulteriori informazioni, vedere Microsoft.ComplexEventProcessing.Adapters.

  • Progettare le classi AdapterFactory di input e di output.

    Un oggetto AdapterFactory è una classe contenitore per gli adattatori. È necessario implementare una classe factory. Le classi factory di base sono organizzate come illustrato di seguito.

    Tipo di adattatore

    Classe di base dell'adattatore di input

    Classe di base dell'adattatore di output

    Tipizzato

    ITypedInputAdapterFactory

    ITypedOutputAdapterFactory

    Non tipizzato

    IInputAdapterFactory

    IOutputAdapterFactory

    Tipizzato con supporto della resilienza

    IHighWaterMarkTypedInputAdapterFactory

    IHighWaterMarkTypedOutputAdapterFactory

    Non tipizzato con supporto della resilienza

    IHighWaterMarkInputAdapterFactory

    IHighWaterMarkOutputAdapterFactory

    La classe factory si utilizza per gli scopi seguenti:

    • Consente la condivisione delle risorse tra diverse implementazioni dell'adattatore per una determinata classe di dispositivi (file CSV, database di SQL Server, formato di log comune dei server Web) o un determinato requisito dell'applicazione. Inoltre, consente di semplificare il passaggio dei parametri di configurazione al costruttore dell'adattatore. Un'applicazione potrebbe ad esempio richiedere tutti e tre i modelli di eventi (punto, intervallo e limite). Un singolo factory può supportare tre implementazioni dell'adattatore, una per ogni modello di eventi. Per un altro esempio, è possibile che l'applicazione abbia la stessa origine evento di una tabella di database, ma l'origine genera più strutture del payload di evento dalla stessa origine sulla base delle query eseguite. In questo caso, un singolo factory può supportare le implementazioni dell'adattatore per gestire ogni struttura di payload.

    • Fornisce un gateway per l'adattatore al runtime del server. Lo sviluppatore dell'adattatore deve implementare i metodi Create() e Dispose() nel factory dell'adattatore per la classe dell'adattatore. Questi metodi vengono richiamati dal server durante l'avvio e l'arresto delle query.

    • Fornisce un gateway per l'adattatore alle informazioni di configurazione pre-runtime. Questo aspetto è particolarmente importante per gli adattatori non tipizzati, che devono determinare il tipo di ogni campo nella struttura dai parametri di configurazione forniti durante l'associazione delle query. È possibile definire la struttura di configurazione nella classe factory e passare questa struttura di configurazione tramite il metodo Create() al metodo del costruttore della classe dell'adattatore. Questa struttura di configurazione viene serializzata tramite DataContractSerialization. A parte questo vincolo, la metodologia di sviluppo offre flessibilità completa nella definizione e nell'utilizzo di questa struttura di configurazione in termini di modalità di popolamento e utilizzo nel costruttore dell'adattatore.

    • Fornisce una modalità di produzione di CTI (Current Time Increment) senza che questi vengano accodati in modo esplicito attraverso l'adattatore di input. Implementando le interfacce ITypedDeclareAdvanceTimePolicy (per un factory di adattatori tipizzati) e IDeclareAdvanceTimePolicy (per un factory di adattatori non tipizzati) nella classe factory di adattatori, l'utente può specificare timestamp e frequenza CTI. Questo aspetto semplifica il codice dell'adattatore e può influire su ogni flusso di eventi prodotto dal factory tramite le istanze dell'adattatore. Per ulteriori informazioni, vedere [AdvanceTimeSettingsClass].

    • Nelle applicazioni resilienti, supporta la resilienza fornendo il limite massimo all'adattatore di input per la riproduzione degli eventi mancanti e il limite massimo e l'offset all'adattatore di output per l'eliminazione degli eventi duplicati. Per ulteriori informazioni, vedere Resilienza di StreamInsight.

  • Compilare e testare l'adattatore.

    Compilare e creare l'adattatore come assembly .NET. Testare le operazioni di base dell'adattatore in relazione a una query pass-through semplice che legge eventi da un adattatore di input e li fornisce all'adattatore di output senza elaborazione di query complesse. In questo modo sarà possibile verificare che l'adattatore sia in grado di leggere e scrivere dai dispositivi e di accodare e rimuovere dalla coda gli eventi.

Macchina a stati dell'adattatore

La macchina a stati che definisce l'interazione tra un adattatore e il server StreamInsight è uguale per gli adattatori di input e di output. Questo aspetto è significativo in quanto la macchina a stati fornisce un modello di sviluppo coerente. La macchina a stati è illustrata nella figura seguente.

Diagramma di stato di accodamento e rimozione dalla coda dell'adattatore

Le caratteristiche e i requisiti principali per il funzionamento di questa macchina sono i seguenti:

  • Start() e Resume() sono metodi chiamati dal server StreamInsight e devono essere implementati dallo sviluppatore dell'adattatore. È inoltre necessario implementare anche il metodo del costruttore per la classe dell'adattatore e il metodo Dispose(), ereditato dalla classe di base.

  • A sua volta, l'implementazione dell'adattatore deve chiamare i metodi seguenti forniti dall'SDK dell'adattatore:

    • Enqueue() per l'adattatore di input. Questo metodo restituisce il valore EnqueueOperationResult.Success o EnqueueOperationResult.Full.

    • Dequeue() per l'adattatore di output. Questo metodo restituisce il valore DequeueOperationResult.Success o DequeueOperationResult.Empty.

    • Ready(). Questo metodo restituisce un valore booleano TRUE o FALSE.

    • Stopped(). Questo metodo restituisce un valore booleano TRUE o FALSE.

  • Il server StreamInsight chiama in modo asincrono il metodo interno (indicato come StopQuery()) per conto dell'utente quando un amministratore o uno sviluppatore di query arresta l'esecuzione delle query tramite metodi nell'API del server.

  • Le chiamate a Enqueue() e Dequeue() restituiscono lo stato Full e Empty, quando l'adattatore si trova, rispettivamente, in uno degli stati seguenti:

    • Suspended

    • Stopping

  • Le chiamate a Enqueue() e Dequeue() comportano la generazione di un'eccezione quando l'adattatore si trova in uno degli stati seguenti:

    • Created

    • Stopped

  • Le chiamate a Ready() causano la generazione di un'eccezione quando l'adattatore è in uno degli stati seguenti:

    • Created

    • Running

    • Stopped

  • Un adattatore passa per alcuni dei cinque stati (Created, Running, Suspended, Stopping e Stopped), o per tutti tali stati, durante il funzionamento. Una transizione di stato si verifica prima che il server StreamInsight chiami il metodo Start() o Resume() e dopo che l'adattatore ha chiamato i metodi Enqueue(), Dequeue(), Ready() e Stopped().

  • Il server StreamInsight e l'adattatore non condividono mai lo stesso thread. Il server chiama sempre Start() o Resume() in un thread di lavoro distinto. Il server ottiene questo thread da un pool di thread del sistema operativo per conto dell'adattatore. In questo modo, i metodi Start() e Resume() dispongono di un elevato livello di flessibilità e potenzialità di utilizzo del thread di lavoro in base alle necessità (ad esempio, per la generazione di più thread per operazioni di lettura o scrittura asincrone). Per questo motivo, è necessario prestare attenzione e applicare le procedure consigliate per quanto riguarda l'utilizzo di risorse di sistema da questo thread.

  • L'API elimina la necessità della sincronizzazione inerente tra le operazioni Start() e Resume() (thread). Il server chiama sempre Resume() esclusivamente dopo che Ready() viene chiamato dall'adattatore. Tuttavia, tenere presente che la sincronizzazione potrebbe essere richiesta per le attività che utilizzano il dispositivo di lettura, scrittura o memorizzazione degli eventi nel buffer, specialmente negli scenari di I/O asincrono. Come procedura consigliata, è preferibile utilizzare I/O non di blocco.

  • Se l'adattatore può essere inattivo, è necessario che controlli periodicamente lo stato per determinare se ne è stato richiesto l'arresto.

Ciclo di vita dell'interazione dell'adattatore con il server

L'handshake tra il server StreamInsight e l'adattatore è sempre sincrono. Pertanto, in corrispondenza di qualsiasi punto nell'esecuzione, tramite l'adattatore è possibile controllare lo stato e reagire di conseguenza. Il ciclo di vita dell'interazione dell'adattatore con il server StreamInsight è costituito dalle operazioni seguenti, che corrispondono alla macchina a stati illustrata nella figura precedente.

  • Stato Created

    Un'istanza dell'adattatore inizia con l'interazione con il server StreamInsight quando la query viene avviata (effettuando una chiamata corrispondente nell'API del server StreamInsight).

  • Stato Running

    Il server imposta l'adattatore in uno stato Running e chiama il metodo Start() nell'adattatore in modo asincrono, garantendo che questa chiamata venga effettuata una sola volta. Quando l'adattatore si trova nello stato Running, può accodare eventi nel server o rimuoverli dalla coda.

    Idealmente, l'adattatore si trova in questo stato per la maggior parte del tempo. Il modello di progettazione consigliato consiste nel richiamare la routine di lettura o scrittura, preferibilmente in un thread separato, dal metodo Start() e restituire un output dalla routine Start(), rilasciando così in modo rapido il thread di lavoro.

    La routine di lettura (si presupponga, ad esempio, che sia denominata ProduceEvents ()) legge gli eventi dall'origine e chiama Enqueue() per effettuare il push degli eventi nel server. Nel caso di un adattatore di output, una routine di scrittura (si presupponga, ad esempio, che sia denominata ConsumeEvents ()) chiama Dequeue() per effettuare il pull degli eventi dal server e scriverli nel sink.

  • Stato Suspended

    Quando il server non è in grado di ricevere un evento accodato, o di restituire un evento da rimuovere dalla coda, l'adattatore di input o di output viene impostato nello stato Suspended. In questo modo, le chiamate a Enqueue() e Dequeue() restituiscono, rispettivamente, lo stato FULL e EMPTY. Nello stato Suspended, è possibile implementare operazioni di manutenzione, come il salvataggio della posizione dell'ultimo record letto dal database o dell'ultima riga letta dal file. Al termine di questa sezione facoltativa, è necessario richiamare il metodo Ready() per comunicare al server che l'adattatore è pronto per riprendere l'esecuzione. Se la routine è in esecuzione nello stesso thread di lavoro di Start(), è necessario restituire l'output dalla routine Start() stessa.

  • In risposta a una chiamata a Ready(), il server reimposta lo stato Running dell'adattatore e chiama sempre in modo asincrono Resume() in un thread di lavoro diverso. È possibile progettare Resume() in modo da accodare o rimuovere dalla coda l'ultima iterazione non riuscita, quindi chiamare ProduceEvents() o ConsumeEvents(). Questo modello può continuare fino a quando l'adattatore passa a uno stato Stopped o Stopping.

  • Stato Stopping

    In qualsiasi momento in cui l'adattatore si trova in uno stato Running o Suspended, il server può impostarlo in uno stato Stopping in risposta a una richiesta asincrona per arrestare la query. In questo stato, richiamando un metodo Enqueue() o Dequeue() viene anche reimpostato, rispettivamente, lo stato FULL o EMPTY.

    Lo stato Stopping fornisce all'implementazione dell'adattatore un'area di gestione temporanea che consente una corretta preparazione per l'arresto. È possibile implementare l'adattatore in modo da rilasciare tutte le risorse ottenute (thread, memoria) e quindi richiamare il metodo Stopped(). Fino a quando questo metodo non viene chiamato, l'adattatore non viene arrestato dal server.

    Si noti che l'adattatore potrebbe passare in uno stato Stopping in modo asincrono. L'adattatore richiede metodi per rilevare il passaggio allo stato Stopped. Come illustrato in precedenza, il modello di progettazione prevede che l'adattatore richiami Ready() quando viene sospeso. In risposta, il server richiama di nuovo il metodo Resume(), abilitando in questo modo il rilevamento dello stato Stopping nel metodo Resume(). Come procedura consigliata, è preferibile posizionare il controllo per lo stato Stopping come primo blocco di codice nell'implementazione di Start() e Resume().

  • Stato Stopped

    Il codice dell'adattatore può chiamare Stopped() in qualsiasi punto. In questo modo, l'adattatore viene impostato nello stato Stopped. Come buona pratica di progettazione, si consiglia di cancellare le risorse che l'adattatore ha ottenuto prima di chiamare Stopped().

    Nota importanteImportante

    Se il metodo Stopped() non viene chiamato correttamente, l'ultima pagina di memoria associata alla query rimane allocata. Questo può provocare piccole perdite di memoria che possono accumularsi nel tempo nel caso in cui siano presenti numerosi cicli di avvio e arresto di query in un processo.

    Nello stato Stopped, l'adattatore non può fare riferimento ad alcuna memoria di evento o ad alcun costrutto specifico del server StreamInsight, né eseguire operazioni di accodamento o rimozione dalla coda. Tali azioni generano un'eccezione. Le attività di pulizia del sistema operativo e che utilizzano il dispositivo possono tuttavia continuare.

Esempi

Per esempi di un'ampia gamma di adattatori di input e di output e di factory di adattatori, vedere le informazioni disponibili nel sito Web di esempi su StreamInsight.

Vedere anche

Concetti

Concetti relativi al server StreamInsight

Architettura del server StreamInsight