Avanzamento del tempo applicazione

Gli sviluppatori di StreamInsight devono trovare un punto di equilibrio tra le necessità di origini dati che possono contenere dati non ordinati e i requisiti di elaborazione di eventi in modo estremamente dinamico. Mentre un più rapido avanzamento del tempo applicazione consente di ridurre la latenza, si riduce anche la finestra per i dati che arrivano in ritardo, ovvero la possibilità di arrivo dei dati in modo non ordinato. StreamInsight offre diversi modi per prendere in considerazione il tempo applicazione. In questo argomento vengono descritti i diversi livelli e criteri di avanzamento del tempo applicazione che possono essere impostati a livello di adattatore e con le associazioni di query.

Informazioni sul modello temporale

Il modello temporale di StreamInsight è basato solo sul tempo applicazione e mai sull'ora di sistema. Questo significa che tutti gli operatori temporali fanno riferimento al timestamp degli eventi e mai al clock di sistema del computer host. Di conseguenza, le applicazioni devono comunicare il tempo applicazione corrente al server StreamInsight. Il tempo applicazione per un'applicazione specificata dipende da molti aspetti diversi nel contesto dell'applicazione. In definitiva, lo sviluppatore dell'applicazione deve fornire il tempo applicazione appropriato al server StreamInsight. Le principali considerazioni relative al tempo applicazione sono le seguenti:

  • Origini dati

    Quando le origini dati comunicano informazioni temporali, tali dati possono essere utilizzati per identificare il punto nel tempo in cui sono stati ricevuti tutti gli eventi dall'origine dati. Questo punto nel tempo costituisce il tempo applicazione corrente rispetto all'origine dati. Si noti che origini dati diverse possono procedere a velocità diverse.

  • Dati non ordinati

    Con alcune origini dati gli eventi non arrivano sempre nell'ordine dei relativi timestamp. I dati sono pertanto non ordinati. StreamInsight è in grado di supportare i dati non ordinati e di garantire che i risultati non dipendano dall'ordine con cui gli eventi arrivano al server StreamInsight. Gli sviluppatori di StreamInsight possono consentire un avanzamento del tempo applicazione con un margine di flessibilità che consente l'inserimento di eventi non ordinati per le origini dati con eventi che arrivano in ritardo.

  • Dinamicità dei risultati

    Le query di StreamInsight restituiscono risultati di cui è nota l'accuratezza fino al tempo applicazione corrente. Questo significa che i risultati vengono restituiti dalle query di StreamInsight man mano che vengono finalizzati dall'avanzamento del tempo applicazione globale.

Current Time Increment (CTI)

Durante l'elaborazione di query, il tempo applicazione è definito dagli eventi CTI (Current Time Increment). Un evento CTI è un evento di punteggiatura che costituisce un componente centrale del modello temporale di StreamInsight. Gli eventi CTI sono utilizzati per eseguire il commit di sequenze di eventi e rilasciare risultati calcolati all'output della query indicando al server StreamInsight che determinate parti della sequenza temporale non cambieranno più. È pertanto molto importante accodare CTI insieme agli eventi nel flusso di eventi di input per produrre qualsiasi risultato e scaricare lo stato degli operatori con stato.

Accodando un CTI, l'input accetta di non produrre alcun evento successivo che influirebbe sul periodo prima del timestamp del CTI. Di conseguenza, dopo l'accodamento di un CTI nell'input:

  • Per eventi di forma punto, intervallo o Edge iniziali, l'ora di inizio dell'evento deve essere in corrispondenza del CTI o successiva al CTI.

  • Per eventi di forma Edge finali, l'ora di fine dell'evento deve essere in corrispondenza del CTI o successiva al CTI.

Se queste regole vengono violate, si verifica una violazione CTI. Di seguito vengono descritte le modalità di gestione di tali violazioni.

Sono disponibili tre metodi per inserire CTI in un flusso di input.

  1. Accodare i CTI a livello di programmazione tramite l'adattatore di input, in modo analogo a come avviene per l'accodamento di eventi.

  2. Generare CTI in modo dichiarativo con una frequenza specifica. Questo valore può essere specificato tramite AdvanceTimeGenerationSettings nel factory dell'adattatore o come parte dell'associazione di query.

  3. Definire un flusso di input separato come origine del CTI. Questo valore può essere specificato solo nell'associazione di query.

Quando vengono implementati i metodi 2 e 3, è necessario implementare anche criteri per le violazioni CTI. Nella sezione seguente vengono descritti l'opzione AdvanceTimeGenerationSettings e i criteri di violazione. Nelle sezioni successive viene descritto come utilizzare le impostazioni di tempo avanzato nel factory dell'adattatore e nell'associazione di query.

Generazione di CTI

La generazione di CTI (descritta in precedenza nei metodi 2 e 3) dispone di due dimensioni:

  1. La frequenza di generazione, specificata come un numero intero positivo N o come un intervallo di tempo T. Tramite i criteri di frequenza della generazione viene inserito un CTI dopo l'occorrenza del conteggio di eventi (N) o dell'intervallo di tempo (T).

  2. Il timestamp dei CTI generati, specificato come ritardo rispetto all'ultimo evento ricevuto.

È inoltre possibile utilizzare un flag booleano per indicare se deve essere inserito un CTI finale con un timestamp infinito positivo quando la query viene chiusa. Questo consente di scaricare tutti gli eventi rimanenti dagli operatori della query.

La generazione CTI viene definita tramite la classe AdvanceTimeGenerationSettings, il cui costruttore accetta la frequenza, il ritardo e il flag, come illustrato nell'esempio seguente.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

Nell'esempio viene indicato al motore di inserire un CTI dopo ogni 10 eventi provenienti dall'origine evento. Il CTI dispone di un timestamp dell'ora dell'ultimo evento meno cinque secondi. Questo meccanismo di ritardo consente di implementare in modo efficace un periodo di tolleranza, in modo che l'origine evento sia in grado di accodare gli eventi in ritardo senza violare la semantica dei CTI, a condizione che il ritardo degli eventi non sia mai superiore a cinque secondi. Quando la query corrispondente viene chiusa, viene accodato un CTI con tempo infinito.

Si noti che quando si specifica una frequenza per la generazione di CTI tramite AdvanceTimeSettings, gli Edge finali non vengono presi in considerazione. Gli Edge finali non vengono considerati neanche quando si utilizza la durata come frequenza. Nel caso di eventi Edge per frequenza e durata, vengono considerati solo gli Edge iniziali.

Criteri di violazione CTI

È possibile che un'origine evento violi la semantica dei CTI inviando eventi con un timestamp precedente ai CTI inseriti. Le impostazioni di tempo avanzato consentono di specificare criteri per la gestione di tali occorrenze. I criteri possono avere i due valori seguenti:

  • Rimozione

    Gli eventi che violano il CTI inserito vengono rimossi e non vengono accodati nella query.

  • Modifica

    Gli eventi che violano il CTI inserito vengono modificati se la loro durata si sovrappone al timestamp del CTI. Di conseguenza, il timestamp di inizio degli eventi viene impostato sul timestamp del CTI più recente, in modo da rendere validi tali eventi. Se sia l'ora di inizio sia l'ora di fine di un evento precedono il timestamp del CTI, l'evento viene rimosso.

Impostazioni di tempo avanzate dell'adattatore

È possibile specificare impostazioni per l'avanzamento del tempo applicazione nella definizione del factory dell'adattatore. Nello stesso modo in cui il metodo Create() del factory viene chiamato ogni volta che viene creata un'istanza di un adattatore, viene chiamato un metodo corrispondente per definire le impostazioni di tempo avanzato dell'istanza dell'adattatore. A questo scopo, utilizzare l'interfaccia ITypedDeclareAdvanceTimeProperties per un adattatore tipizzato (o IDeclareAdvanceTimeProperties per un adattatore non tipizzato), come illustrato nell'esempio seguente.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

Questa interfaccia richiede l'implementazione del metodo seguente come parte del factory.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

Il metodo DeclareAdvanceTimeProperties() viene chiamato per ogni nuovo adattatore di cui viene creata un'istanza con lo stesso parametro relativo a struttura di configurazione e forma di evento specificato nella chiamata al metodo Create() corrispondente. Questo consente all'autore dell'adattatore di dedurre le impostazioni di generazione di CTI corrette dalle informazioni di configurazione senza che sia necessario che l'autore della query e il binder siano consapevoli delle specifiche delle impostazioni di tempo avanzato.

Il costruttore AdapterAdvanceTimeSettings richiede sia l'oggetto AdvanceTimeGenerationSettings sia i criteri di violazione descritti in precedenza.

Generazione di CTI nell'associazione di query

Analogamente a AdapterAdvanceTimeSettings, la pubblicazione di CTI può essere specificata in modo dichiarativo nell'associazione di query, come illustrato nell'esempio seguente. Questo consente all'utente che associa la query di definire il comportamento del tempo applicazione CTI indipendentemente dall'implementazione dell'adattatore.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

Il costruttore AdvanceTimeSettings accetta i tre argomenti seguenti:

  1. Oggetto AdvanceTimeGenerationSettings.

  2. Oggetto AdvanceTimeImportSettings.

  3. I criteri di violazione.

Si noti che è possibile impostare su Null le impostazioni di generazione o gli argomenti delle impostazioni di importazione, ma non entrambi. Questi valori possono inoltre essere specificati congiuntamente. Nella sezione successiva viene introdotta la classe AdvanceTimeImportSettings.

Nell'esempio precedente viene specificato di generare e inserire un CTI con ogni evento, con il timestamp dell'evento (senza ritardo). L'oggetto AdvanceTimeSettings può essere passato come ultimo parametro facoltativo al metodo CepStream.Create() come illustrato nell'esempio seguente.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

Può inoltre essere utilizzato anche nel modello di sviluppo dello strumento di associazione di query:

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

Sincronizzazione con un altro flusso

In caso di utilizzo durante l'associazione di query, oltre a, o al posto di, generare CTI basati sulla frequenza, è anche possibile copiarli da un altro flusso di input nella query tramite AdvanceTimeImportSettings. Questa caratteristica consente la sincronizzazione di due flussi come illustrato nell'esempio seguente.

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

In questo esempio viene illustrato un caso di utilizzo tipico in cui un flusso di dati "veloce" deve essere unito in join con un flusso di riferimento "lento". Il flusso lento può essere costituito da dati di ricerca che cambiano meno frequentemente rispetto al flusso veloce. Per consentire al join di produrre output veloce come l'input più veloce, il flusso di input lento viene sincronizzato con il flusso veloce importando i relativi CTI. In questo esempio, si considera che la gestione del tempo applicazione del flusso veloce avvenga nell'adattatore.

Dinamicità dei risultati

Il parametro relativo al ritardo delle impostazioni di generazione del tempo avanzato specifica il timestamp dei CTI inseriti. È importante comprendere la semantica precisa dei CTI nel framework di StreamInsight per ottenere l'effetto desiderato per la dinamicità dell'output. Un CTI indica al motore che viene eseguito il commit di tutti gli oggetti nella sequenza temporale rigorosamente precedenti al timestamp del CTI. Ciò ha diverse implicazioni per la dinamicità del risultato.

Si consideri, ad esempio, un flusso di input di eventi punto e un'impostazione di generazione CTI con frequenza 1 (ogni evento) e ritardo 0. In questo caso i CTI vengono prodotti esattamente con lo stesso timestamp di ogni evento punto. Questo significa tuttavia che il commit dell'ultimo evento punto verrà eseguito solo con il CTI successivo in quanto il relativo timestamp non è rigorosamente precedente al CTI corrispondente. Per eseguire il commit di ogni evento punto non appena viene generato dall'adattatore, è necessario che per i CTI vengano impostati i timestamp immediatamente dopo gli eventi punto. Viene pertanto prodotto un ritardo negativo di un tick, come illustrato nell'esempio seguente.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

CTI e operatori di query

I CTI vengono accodati dall'adattatore di input o inseriti come descritto sopra. Si propagano nella query e vengono elaborati in modo diverso da alcuni operatori. Gli operatori di join, ad esempio, rilasciano i risultati fino al CTI più obsoleto da entrambi i lati. Gli operatori Union rilasciano il risultato più obsoleto dei CTI più recenti da entrambi i lati. L'intera query rilascerà il risultato solo fino al CTI più recente.

Alcuni operatori hanno invece effetto sui timestamp CTI. Le finestre di salto trattengono i CTI all'interno di una finestra all'inizio della finestra perché il risultato dell'operazione sopra la finestra può cambiare mentre gli eventi rientrano ancora in quella finestra. I metodi ShiftEventTime() e AlterEventLifeTime() modificano entrambi l'ora di inizio degli eventi e ai CTI verrà applicata la stessa trasformazione.

Vedere anche

Concetti

Creazione di adattatori di input e di output

Concetti relativi al server StreamInsight

Cronologia modifiche

Contenuto aggiornato

È stata aggiunta la sezione "CTI e operatori di query".

Sono state aggiunge informazioni nella sezione "Generazione di CTI" relative al fatto che gli Edge finali non vengono presi in considerazione quando si specifica una frequenza CTI tramite AdvanceTimeSettings.