Operatori del flusso definito dall'utente

Un operatore del flusso definito dall'utente consente di specificare l'elaborazione personalizzata di flussi di eventi.

Modello di utilizzo

All'interno di una query, è possibile effettuare una chiamata a un operatore del flusso definito dall'utente tramite il metodo di estensione Scan di CepStream. Vengono forniti il flusso di input e lo stato iniziale per l'operatore, come mostrato nell'esempio seguente.

var query = input.Scan(new SmoothingOperator(0.7));

L'autore dell'operatore fa derivare una nuova classe dai tipi CepPointStreamOperator o CepEdgeStreamOperator astratti. Nel nuovo tipo è incapsulata la macchina a stati dell'operatore. Una chiamata al costruttore di questo tipo viene passata all'operatore di analisi per stabilire lo stato iniziale per l'operatore.

Caratteristiche di un operatore del flusso definito dall'utente

Un operatore del flusso definito dall'utente consente all'utente di interagire con il flusso di eventi in modo procedurale. Di conseguenza, rappresenta un limite tra il motore di elaborazione di eventi di StreamInsight e il CLR, simile ad adattatori e aggregazioni oppure operatori definiti dall'utente. In tutti questi casi, un contratto sulle proprietà temporali di un flusso viene soddisfatto dal motore e dallo sviluppatore. Il motore di StreamInsight fornisce le seguenti garanzie all'operatore del flusso definito dall'utente per assicurare un comportamento in modo deterministico per una sequenza specifica di eventi di input della query:

  • Un operatore del flusso definito dall'utente consente di ricevere eventi ordinati in base alla relativa ora di sincronizzazione, ovvero l'ora di inizio per eventi punto ed Edge iniziali e l'ora di fine per eventi Edge finali. Gli eventi intervallo non sono supportati perché non dispongono di una rappresentazione ordinata dell'ora di sincronizzazione chiara, dal momento che tramite ogni evento vengono indicati due momenti, ovvero un inizio e una fine.

  • Gli eventi di inserimento vengono passati solo a un operatore del flusso definito dall'utente. Gli eventi Current Time Increment (CTI) nel flusso in ingresso sono trasparenti all'operatore del flusso definito dall'utente, tuttavia consentono di determinare comunque la modalità di percezione di passaggio del tempo (vedere NextCti di seguito) da parte dell'operatore del flusso definito dall'utente.

  • Un operatore del flusso definito dall'utente può essere disattivato da StreamInsight, se lo consente (vedere IsEmpty di seguito). Un operatore del flusso definito dall'utente disattivato può essere riciclato da StreamInsight.

  • Ogni evento di inserimento determina la chiamata a ProcessEvent, seguita dal polling delle proprietà NextCti e IsEmpty.

Input e output di un operatore del flusso definito dall'utente

Un operatore del flusso definito dall'utente consente di elaborare un evento di input alla volta. In risposta a ogni evento di input, è possibile che consenta di generare 0-* eventi di output. Tramite l'operatore è inoltre possibile aggiornare il relativo stato interno in risposta a un input. Un evento di input può essere un CTI, generato alla richiesta dell'operatore per indicare il passaggio del tempo, o un inserimento. Gli input vengono annotati in modo temporale.

Un evento di output, invece, è semplicemente un payload dell'evento. Non è disponibile alcuna opzione per applicare il timestamp di eventi di output o per inserire CTI nel flusso di output. Gli eventi di output vengono generati come eventi punto, con timestamp basati su quelli degli eventi di input corrispondenti.

Gestione del tempo in un operatore del flusso definito dall'utente

Quando si crea un nuovo operatore del flusso definito dall'utente, tramite il codice deve essere elaborato solo il payload di eventi. Il tempo viene gestito esclusivamente da StreamInsight. Gli eventi di input vengono ricevuti in ordine. Il timestamp di ogni evento di output è basato su quello dell'evento di input corrispondente. Ad esempio, se tramite un evento Edge finale viene attivato un evento di output, il timestamp ricevuto dall'evento di output sarà quello dell'evento Edge finale. Pertanto, l'operatore può essere influenzato dall'ora che non può però essere controllata da quest'ultimo.

L'operatore del flusso definito dall'utente non consente la ricezione di CTI direttamente dal flusso di input nel relativo metodo ProcessEvent(), ma è in grado di reagire al passaggio del tempo tramite la proprietà NextCti. Il polling di questa proprietà viene eseguito dal motore dopo ogni chiamata a ProcessEvent(). L'operatore del flusso definito dall'utente consente di restituire un timestamp tramite cui viene indicato il successivo timestamp CTI che sarà ricevuto come chiamata nel metodo ProcessEvent().

Solo i CTI richiesti impostando la proprietà NextCti verranno passati a ProcessEvent. Tali CTI non verranno propagati esternamente all'operatore del flusso definito dall'utente.

Implementazione di un operatore del flusso definito dall'utente

Per creare un nuovo operatore del flusso definito dall'utente, far derivare una nuova classe dalle classi di base CepPointStreamOperator o CepEdgeStreamOperator astratte.

  • Se si fa derivare dalla classe di base CepPointStreamOperator astratta, gli eventi di input vengono visualizzati dall'operatore come eventi punto. Tuttavia, non è un errore se gli eventi non sono di fatto eventi punto. Dall'operatore vengono visualizzate solo le relative ore di inizio.

  • Se si fa derivare dalla classe di base CepEdgeStreamOperator astratta, dall'operatore vengono visualizzati sia gli Edge iniziali sia quelli finali per gli eventi di input.

Nella classe derivata in uso, è possibile eseguire l'override delle proprietà e dei metodi seguenti:

  • Metodo ProcessEvent. Consente di generare un output e di aggiornare lo stato interno dell'operatore in risposta a ogni evento di input. Tramite ProcessEvent viene ricevuto un evento di input e possono essere restituiti zero o più payload di output.

  • Proprietà IsEmpty. Consente di indicare se lo stato interno dell'operatore è vuoto. Se true, tramite il motore query di StreamInsight è possibile rimuovere l'istanza dell'operatore per ridurre l'utilizzo della memoria.

  • Facoltativamente, il metodo NextCti. Consente di indicare la temporizzazione successiva dell'invio dell'evento CTI all'operatore. L'override di questa proprietà consente all'operatore definito dall'utente di generare output in un momento futuro specifico o di indicare che lo stato interno è vuoto dopo che è trascorso un certo intervallo di tempo dell'applicazione.

Tramite la classe derivata deve essere implementata anche la serializzazione WCF. Per ulteriori informazioni, vedere Procedura: creare un contratto dati di base per una classe o una struttura.

Modalità di interazione del motore di StreamInsight con l'operatore

Per ogni istanza dell'operatore, il metodo ProcessEvent viene chiamato con gli eventi in ordine di ora di sincronizzazione. Per un evento punto o CTI, l'ora di sincronizzazione è l'ora di inizio valida. Per un evento Edge, l'ora di sincronizzazione è l'ora di inizio valida per gli Edge iniziali o l'ora di fine valida per gli Edge finali.

Dopo ogni chiamata al metodo ProcessEvent, viene eseguito il polling delle proprietà IsEmpty e NextCti.

Quando tramite l'operatore viene eseguito l'override di NextCti, il successivo evento elaborato dall'operatore garantito dal motore sarà un evento di inserimento con un'ora di sincronizzazione minore del valore di NextCti o un CTI con il valore di NextCti come ora di inizio. Se tramite l'operatore viene restituito un valore NextCti minore o uguale all'ora di sincronizzazione dell'ultimo evento elaborato, tale valore viene ignorato. La proprietà NextCti consente all'operatore di 'tradurre' l'avanzamento del tempo nel flusso di input in un ritmo proprio (nel formato di questi CTI interni) e di reagire quindi di conseguenza a tale avanzamento.

Gli operatori vengono attivati solo in risposta agli eventi di inserimento. L'attivazione non viene effettuata tramite CTI. Un operatore viene disattivato quando viene restituito True da IsEmpty dall'operatore stesso.

In qualsiasi momento, tramite il motore è possibile scegliere di serializzare l'operatore e di rilasciare il relativo riferimento. Quando l'operatore viene deserializzato in un secondo momento, è prevista la scelta del punto in cui eseguire l'interruzione.

Esempi di operatori del flusso definiti dall'utente

Livellamento esponenziale

Questo operatore del flusso definito dall'utente consente di utilizzare un flusso di eventi punto come sequenza di valori e di applicare il livellamento esponenziale. Si noti che è necessario un riferimento a System.Runtime.Serialization.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

Criteri di ricerca

In questo esempio semplice di criteri di ricerca viene illustrato un utilizzo alternativo di IsEmpty e NextCti. Nell'esempio, l'operatore consente di cercare un evento con valore 1.0 che non viene seguito da un evento con valore 2.0 entro 30 secondi. Questo esempio viene fornito per illustrare concetti utili negli operatori del flusso definiti dall'utente. In un'applicazione effettiva, questo modello è semplice abbastanza per essere implementato tramite operatori predefiniti in StreamInsight.

Nell'esempio precedente è stato utilizzato NextCti per controllare la durata di un operatore. Anche in questo esempio viene utilizzato NextCti per questo scopo, ma in aggiunta viene utilizzato NextCti per generare output in risposta al passaggio del tempo.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

Definizione di un metodo di supporto per la semplificazione dell'utilizzo

È possibile semplificare l'utilizzo dell'operatore in una query. Ad esempio, è più conveniente per l'autore della query scrivere input.Smooth(0.5) anziché input.Scan(new SmoothingOperator(0.5)).

È possibile abilitare questo modello semplificato creando un metodo di estensione personalizzato come indicato di seguito:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }