共用方式為


使用者定義的資料流運算子

使用者定義的資料流運算子可讓您定義事件資料流的自訂處理。

使用模式

您可以在查詢中使用 CepStreamScan 擴充方法,藉以呼叫使用者定義的資料流運算子。您要提供輸入資料流以及運算子的初始狀態,如以下範例。

var query = input.Scan(new SmoothingOperator(0.7));

運算子的作者會從抽象的 CepPointStreamOperatorCepEdgeStreamOperator 類型衍生新類別。新類型會封裝運算子的狀態機器。系統會將此類型之建構函式的呼叫傳遞到掃描運算子,以建立運算子的初始狀態。

使用者定義之資料流運算子的特性

使用者定義的資料流運算子可讓使用者以程序的方式與事件資料流互動。因此,它代表 StreamInsight 事件處理引擎與 CLR 之間的界限,類似於配接器與使用者定義的運算子或彙總。在所有這些情況下,引擎以及開發人員都要完成資料流之暫時屬性的相關合約。StreamInsight 引擎會對使用者定義的資料流運算子做下列保證,以便針對一連串特定的查詢輸入事件,以決定性的方式運作:

  • 系統會保證使用者定義的資料流運算子收到依其同步處理時間排列的事件 (開始時間用於點與開始邊緣事件,結束時間用於結束邊緣事件)。間隔事件不受到支援,因為它們沒有直接的同步處理時間排列表示法 (每個事件都會指示兩個時間點:開始和結束)。

  • 系統只會將插入事件傳遞到使用者定義的資料流運算子。內送資料流中的目前時間累加事件 (CTI) 對使用者定義的資料流運算子而言是透明的,但是這些事件仍然會判斷使用者定義的資料流運算子如何察覺時間的經過 (請參閱下方的 NextCti)。

  • StreamInsight 可以停用使用者定義的資料流運算子 (如果允許的話) (請參閱下方的 IsEmpty)。StreamInsight 可以回收已停用之使用者定義的資料流運算子。

  • 每個插入事件都會先呼叫 ProcessEvent,然後再輪詢 NextCti 和 IsEmpty 屬性。

使用者定義之資料流運算子的輸入和輸出

使用者定義的資料流運算子一次處理一個輸入事件。它可能會產生 0-* 個輸出事件以回應每個輸入事件。運算子也可能會更新其內部狀態以回應某個輸入。輸入事件可以是 CTI (在要求運算子時產生,表示時間的經過) 或插入。輸入會暫時加上註解。

相較之下,輸出事件只是一個事件裝載。您無法為輸出事件加上時間戳記,或將 CTI 插入輸出資料流中。系統會使用以對應輸入事件時間戳記為基礎的時間戳記,產生輸出事件做為時間點事件。

處理使用者定義之資料流運算子中的時間

當您建立新使用者定義的資料流運算子時,您的程式碼只需要處理事件的裝載。時間是透過 StreamInsight 獨佔處理。系統會依序收到輸入事件。每個輸出事件的時間戳記都是以對應輸入事件的時間戳記為基礎。例如,如果邊緣結束事件觸發某個輸出事件,則該輸出事件會收到邊緣結束事件的時間戳記。因此,運算子可能會受到時間影響,但無法控制時間。

使用者定義的資料流運算子無法使用其 ProcessEvent() 方法,直接從輸入資料流收到 CTI,但是可以透過 NextCti 屬性,對時間的經過有反應。引擎會在每次呼叫 ProcessEvent() 之後,輪詢這個屬性。使用者定義的資料流運算子可以傳回一個時間戳記,這個時間戳記會指出它在呼叫 ProcessEvent() 時,將會收到的下一個 CTI 時間戳記。

系統只會將已經透過設定 NextCti 屬性要求的 CTI 傳遞到 ProcessEvent。系統不會將這些 CTI 傳播到使用者定義的資料流運算子外部。

實作使用者定義的資料流運算子

若要建立新使用者定義的資料流運算子,請從抽象的 CepPointStreamOperatorCepEdgeStreamOperator 基底類別衍生一個新的類別。

  • 如果您要衍生自抽象的 CepPointStreamOperator 基底類別,則運算子會將輸入事件視為時間點事件。不過,如果這些事件事實上不是時間點事件,就不是一個錯誤。運算子僅會查看其開始時間。

  • 如果您要衍生自抽象的 CepEdgeStreamOperator 基底類別,則運算子會同時查看輸入事件的開始和結束邊緣。

在您衍生的類別中,可以覆寫下列屬性和方法:

  • ProcessEvent 方法:產生輸出,並更新運算子的內部狀態以回應每個輸入事件。ProcessEvent 會收到一個輸入事件,而且可以傳回零個或多個輸出裝載。

  • IsEmpty 屬性:指出運算子的內部狀態是否為空白。如果為 true,StreamInsight 查詢引擎可能會捨棄運算子執行個體,以減少記憶體使用量。

  • (選擇性) NextCti 方法:指出將 CTI 事件傳送至運算子的下一個時間點。覆寫這個屬性可讓使用者定義的運算子在未來的特定時間點產生輸出,或是在經過一段應用時間間隔後指出其內部狀態為空白。

衍生的類別也必須實作 WCF 序列化。如需詳細資訊,請參閱HOW TO:建立類別或結構的基本資料合約

StreamInsight 引擎如何與運算子互動

對於運算子的每個執行個體,系統會以同步處理時間順序,透過事件呼叫 ProcessEvent 方法。對於時間點事件或 CTI,同步處理時間就是有效的開始時間。對於邊緣事件,同步處理時間是開始邊緣的有效開始時間,或者是結束邊緣的有效結束時間。

在每次呼叫 ProcessEvent 方法之後,會輪詢 IsEmptyNextCti 屬性。

當運算子覆寫 NextCti 時,此引擎會保證透過運算子處理的下一個事件將是同步處理時間小於 NextCti 之值,或 NextCti 值為其開始時間之 CTI 的插入事件。如果運算子傳回小於或等於上次處理之事件同步處理時間的 NextCti 值,就會遭到忽略。NextCti 屬性可讓運算子將輸入資料流的時間進展「轉譯」成自己的節奏 (以這些內部 CTI 的形式),並據此對這個進展做出反應。

運算子的啟用只為回應插入事件。CTI 不會觸發啟用。當運算子從 IsEmpty 傳回 true 時,會停用該運算子。

此引擎隨時可以選擇序列化運算子並釋放其參考。之後還原序列化運算子時,應該會從停止運算子的地方收取它。

使用者定義之資料流運算子的範例

指數平滑

這個使用者定義的資料流運算子會將時間點事件的資料流視為一連串的值,並套用指數平滑。請注意,System.Runtime.Serialization 的參考是必要的。

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

模式比對

這個簡單的模式比對範例說明 IsEmptyNextCti 的替代用法。在此範例中,運算子會尋找其值為 1.0 的事件,而且在 30 秒之內,這個事件後面沒有其值為 2.0 的事件 (提供此範例的用意,在於說明使用者定義之資料流運算子的實用概念。在實際的應用程式中,此模式夠簡單,而可以透過使用 StreamInsight 中的內建運算子實作)。

上一個範例使用 NextCti 控制運算子的存留時間。此範例也使用 NextCti 來達到這個目的,但除此之外,還使用了 NextCti 來產生輸出以回應時間的經過。

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

定義 Helper 方法來簡化使用方式

您可以簡化在查詢中使用運算子的方式。例如,如果查詢作者可以寫入 input.Smooth(0.5) 而非 input.Scan(new SmoothingOperator(0.5)),將會更方便。

您可以建立自訂的擴充方法來啟用這個簡化的模式,如下所示:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }