Поделиться через


Определяемые пользователем операторы потока

Определяемый пользователем оператор потока позволяет определить пользовательскую обработку потоков событий.

Способ использования

В запросе вызывается определяемый пользователем оператор потока с помощью метода расширения Scan объекта CepStream. Для оператора предоставляются входной поток и начальное состояние, как в следующем примере.

var query = input.Scan(new SmoothingOperator(0.7));

Разработчик оператора предусматривает получение нового класса из абстрактных типов CepPointStreamOperator или CepEdgeStreamOperator. Новый тип инкапсулирует конечный автомат оператора. Вызов конструктора этого типа передается оператору просмотра в целях определения начального состояния для оператора.

Характеристики определяемого пользователем оператора потока

Определяемый пользователем оператор потока позволяет пользователю взаимодействовать с потоком событий процедурным способом. Как таковой он представляет собой границу между подсистемой обработки событий StreamInsight и средой CLR по аналогии с адаптерами и определяемыми пользователем операторами или статистическими выражениями. Во всех этих случаях и подсистема, и разработчик выполняют контракт, регламентирующий временные свойства потока. Подсистема StreamInsight предоставляет следующие гарантии определяемому пользователем оператору потока, чтобы можно было организовать детерминированное поведение применительно к конкретной последовательности входных событий запроса.

  • Для определяемого пользователем оператора потока предоставляется гарантия получения событий, упорядоченных по их времени синхронизации (время начала для событий точки и начального граничного события; время окончания для конечных граничных событий). Интервальные события не поддерживаются, поскольку они не имеют простого представления, упорядоченного по времени синхронизации. Дело в том, что каждое событие указывает два момента времени — начало и конец.

  • Определяемому пользователем оператору потока передаются только события вставки. События увеличения текущего времени (CTI) во входящем потоке являются прозрачными для определяемого пользователем оператора потока, но они все еще регламентируют, как определяемый пользователем оператор потока воспринимает течение времени (см. NextCti ниже).

  • Определяемый пользователем оператор потока может быть переведен в неактивное состояние с помощью StreamInsight, в зависимости от того, допускает ли он это (см. IsEmpty ниже). Работа переведенного в неактивное состояние определяемого пользователем оператора потока может быть возобновлена с помощью StreamInsight.

  • Каждое событие вставки влечет за собой вызов ProcessEvent, а за этим следует опрос свойств NextCti и IsEmpty.

Ввод и вывод для определяемого пользователем оператора потока

Определяемый пользователем оператор потока обрабатывает одновременно одно входное событие. В ответ на каждое событие входа он может произвести 0-* событий выхода. Оператор может также обновлять свое внутреннее состояние в ответ на некоторый ввод. Входное событие может представлять собой либо событие CTI (формированное по требованию оператора для обозначения течения времени), либо событие вставки. Входные события аннотируются по времени.

В отличие от этого, событие выхода представляет собой просто полезные данные события. Не существует возможности проставлять отметки времени для событий выхода или вводить CTI в выходной поток. События выхода формируются как точечные события с отметками времени, которые основаны на отметках времени соответствующих событий входа.

Обработка времени в определяемом пользователем операторе потока

При создании нового определяемого пользователем оператора потока необходимо обеспечить обработку в коде только полезных данных событий. Обработка значений времени осуществляется исключительно в StreamInsight. Получение входных событий происходит в определенном порядке. Отметка времени каждого события выхода основана на отметке времени соответствующего события входа. Например, если событие конца границы активирует событие выхода, то данное событие выхода получает отметку времени события конца границы. Поэтому оператор может находиться под влиянием времени, но не может этим управлять.

Определяемый пользователем оператор потока не получает CTI непосредственно из входного потока в свой метод ProcessEvent(), но обладает способностью реагировать на течение времени благодаря наличию свойства NextCti. Это свойство опрашивается подсистемой после каждого вызова ProcessEvent(). Определяемый пользователем оператор потока может возвращать отметку времени, которая указывает следующую отметку времени CTI, полученную им в качестве вызова ProcessEvent().

Только те CTI, которые были затребованы путем задания свойства NextCti, будут переданы в ProcessEvent. Эти CTI не распространяются за пределы определяемого пользователем оператора потока.

Реализация определяемого пользователем оператора потока

Для создания нового определяемого пользователем оператора потока необходимо получить новый класс из абстрактного базового класса CepPointStreamOperator или CepEdgeStreamOperator.

  • Если для получения нового класса применяется абстрактный базовый класс CepPointStreamOperator, то оператор видит входные события как точечные события. Но это не ошибка, если события фактически не являются точечными. Оператор видит только значения времени их начала.

  • Если для получения нового класса применяется абстрактный базовый класс CepEdgeStreamOperator, то оператор видит и начальные, и конечные границы для входных событий.

В полученном производном классе необходимо переопределить следующие свойства и методы.

  • Метод ProcessEvent. Формирует вывод и обновляет внутреннее состояние оператора в ответ на каждое входное событие. В ProcessEvent происходит получение одного входного события и может быть возвращено нуль или более выходных полезных данных.

  • Свойство IsEmpty. Указывает, является ли внутреннее состояние оператора пустым. При значении true подсистема запросов StreamInsight может отбросить экземпляр оператора для уменьшения потребности в памяти.

  • Метод NextCti (необязательно). Указывает следующий момент времени, в который событие CTI будет отправлено оператору. Переопределение этого свойства позволяет определяемому пользователем оператору производить вывод в конкретный момент времени в будущем или указывать, что его внутреннее состояние является пустым по истечении некоторого интервала времени приложения.

Производный класс должен также реализовывать сериализацию WCF. Дополнительные сведения см. в разделе Как создать основной контракт данных для класса или структуры

Как подсистема StreamInsight взаимодействует с оператором

Для каждого экземпляра оператора вызывается метод ProcessEvent с событиями в порядке времени синхронизации. Для точечного события или CTI временем синхронизации является допустимое время начала. Для граничного события временем синхронизации является допустимое время начала для начальных границ или допустимое время окончания для конечных границ.

После каждого вызова метода ProcessEvent опрашиваются свойства IsEmpty и NextCti.

Если оператор переопределяет значение NextCti, то подсистема гарантирует, что следующим событием, обработанным оператором, будет либо событие вставки со значением времени синхронизации меньше значения NextCti, либо CTI со значением NextCti в качестве его времени начала. Если оператор возвращает значение NextCti, которое меньше или равно времени синхронизации последнего обработанного события, это значение пропускается. Свойство NextCti позволяет оператору «переводить» ход времени во входном потоке в свой собственный ритм (в форме этих внутренних CTI), а затем реагировать на этот ход времени соответствующим образом.

Операторы активируются в ответ только на события вставки. CTI не вызывают активацию. Оператор переводится в неактивное состояние, если он возвращает true после опроса свойства IsEmpty.

В любой момент подсистема может выбрать, следует ли сериализовать оператор и освободить свою ссылку на него. Предполагается, что после последующей десериализации оператора его работа будет продолжена с того момента, где она закончилась.

Примеры определяемых пользователем операторов потока

Экспоненциальное сглаживание

В этом определяемом пользователем операторе потока любой поток точечных событий рассматривается как последовательность значений и к нему применяется экспоненциальное сглаживание. Следует учитывать, что требуется ссылка на System.Runtime.Serialization.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

Сопоставление с шаблоном

Этот простой пример сопоставления с шаблоном иллюстрирует альтернативный вариант использования свойств IsEmpty и NextCti. В этом примере в операторе проводится поиск события со значением 1.0, за которым не следует событие со значением 2.0 в течение тридцати секунд. (Этот пример предоставлен в качестве иллюстрации полезных понятий, связанных с определяемыми пользователем операторами потока. Если речь идет о реальном приложении, то этот шаблон достаточно прост, чтобы его можно было реализовать с помощью встроенных операторов в StreamInsight.)

В предыдущем примере свойство NextCti использовалось для управления временем существования оператора. В данном примере свойство NextCti также используется с указанной целью, но кроме того, в нем свойство NextCti применяется для формирования вывода в ответ на течение времени.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

Определение вспомогательного метода для упрощения использования

Иногда может потребоваться упростить использование этого оператора в запросе. Например, для разработчика запроса было бы удобнее написать input.Smooth(0.5), а не input.Scan(new SmoothingOperator(0.5)).

Этот упрощенный шаблон можно включить путем создания пользовательского метода расширения, подобного следующему:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }