Поделиться через


Определяемые пользователем статистические функции и операторы

В примерах этого раздела показано, как расширить набор оконных операций в операторах StreamInsight LINQ с помощью определяемых пользователем агрегатных функций и операторов. Эти модули определены на основе окон событий и возвращают нуль или больше результирующих событий. Определяемая пользователем агрегатная функция или определяемый пользователем оператор должен быть откомпилирован в сборку, доступ к которой может быть получен сервером StreamInsight таким же образом, как и к адаптерам, предоставляемым и используемым во время выполнения.

StreamInsight также предлагает определяемые пользователем операторы потока в качестве еще одного механизма расширяемости. Определяемые пользователем операторы потока определены непосредственно на основе потока событий, а не на основе окон событий.

Определяемые пользователем агрегатные функции

Определяемая пользователем агрегатная функция используется на базе спецификации окна для выполнения статистической обработки событий в этом окне и создания одиночного значения. Определяемая пользователем агрегатная функция принимает в качестве входного параметра окно обработки сложных событий (являющееся результатом оператора на основе окна переключения, окна моментального снимка или окна количества), которое содержит набор событий обработки сложных событий и возвращает одиночное значение (тип CLR, соответствующий одному из типов-примитивов StreamInsight). Дополнительные сведения об окнах см. в разделе Использование окон событий.

Можно реализовать определяемые пользователем агрегатные функции, которые будут более сложными, чем предоставляемые StreamInsight простые агрегаты, такие как count, sum и average. Пример такой функции, которая вычисляет средневзвешенные по времени значения, описывается в одном из следующих разделов.

Определяемые пользователем операторы

Определяемый пользователем оператор используется на базе спецификации окна для обработки событий в окне и создания набора из одного или нескольких результирующих событий. Определяемый пользователем оператор принимает в качестве входного параметра окно обработки сложных событий (являющееся результатом оператора на основе окна переключения, окна моментального снимка или окна количества), которое содержит набор событий обработки сложных событий и выводит набор событий обработки сложных событий или набор полезных данных обработки сложных событий.

Определяемый пользователем оператор можно использовать, если нужно, чтобы вычисление создавало или обрабатывало целые события, включая отметки времени, для каждого окна. Примером является настройка поля состояния события, в дополнение к расчету статистических данных, где состояние зависит от результатов статистической обработки и другого параметра. Например, определяемый пользователем оператор может создать одно событие для каждого окна, которое содержит поле полезных данных, имеющее результат статистической обработки, и поле состояния, которое указывает, нарушил ли результат статистической обработки некоторое ограничение.

Зависимость от времени в UDA и UDO

Можно задавать определяемые пользователем агрегатные функции и операторы с учетом времени или без учета времени, в зависимости от выбранного базового класса для реализации.

Определяемые пользователем агрегатные функции и операторы без учета времени не ожидают, что события будут передаваться полностью, включая отметки времени. Вместо этого они обрабатывают в событиях в определенном окне только набор из одного или нескольких полей полезных данных. Кроме того, им не передаются время начала и время окончания для текущего окна.

Определяемым пользователем агрегатным функциям и операторам с учетом времени передается набор событий для каждого окна, включая отметки времени и время начала и окончания для окна. Зависит ли UDA или UDO от времени, определяется соответствующим базовым классом, от которого разработчик UDA или UDO производит реализацию.

Реализация определяемых пользователем агрегатов

Разработчик UDA должен:

  • Предоставить фактическую реализацию UDA.

  • Предоставить метод расширения для LINQ, чтобы составитель запроса мог использовать UDA.

Для реализации определяемой пользователем агрегатной функции пользователь создает класс, производный от соответствующего базового класса: CepAggregate для определяемых пользователем агрегатных функций без учета времени и CepTimeSensitiveAggregate для определяемых пользователем агрегатных функций с учетом времени.

Для создания производного класса требуется создание экземпляров параметров входного и выходного типа. Входной тип представляет все полезные данные (если определяемая пользователем агрегатная функция должна иметь возможность просматривать весь набор полей полезных данных в процессе вычисления) или тип CLR, соответствующий типу-примитиву в системе типов StreamInsight (в случае, когда входным аргументом определяемой пользователем агрегатной функции является одиночное поле). Выходной тип в обоих случаях должен быть типом CLR, который соответствует типу-примитиву.

Помимо данных события, конструктору класса определяемой пользователем агрегатной функции, если это предусмотрено разработчиком, во время начала запроса можно передавать дополнительную структуру конфигурации. Если автор определяемой пользователем агрегатной функции предоставляет такой конструктор, то система будет вызывать его во время выполнения с использованием конфигурации, предоставленной вызывающим объектов определяемой пользователем агрегатной функции в LINQ.

Определяемые пользователем агрегатные функции с учетом времени и без учета времени получают полезные данные в виде неупорядоченного набора. В случае определяемой пользователем агрегатной функции с учетом времени с каждым элементом полезных данных дополнительно связываются отметки времени событий. Кроме того, в определяемую пользователем агрегатную функцию передается дескриптор окна, который определяет время начала и время окончания для окна.

Примеры определяемых пользователем агрегатов

В следующем примере реализуется определяемая пользователем агрегатная функция без учета времени. Она принимает набор целочисленных полей события. Для этого примера реализации не указывается дополнительная структура конфигурации, поскольку классу не требуется специальный конструктор.

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

Кроме реализации определяемой пользователем агрегатной функции, необходимо предоставить метод расширения для LINQ, чтобы создатель запроса мог использовать определяемую пользователем агрегатную функцию. Методом расширения является подпись, которая дает возможность автору запроса использовать агрегат и компилировать запрос. Через атрибут поставщик StreamInsight LINQ может ссылаться на фактический класс, который содержит реализацию определяемой пользователем агрегатной функции, как показано в следующем примере.

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

Здесь реализация определяемой пользователем агрегатной функции должна выполняться посредством класса Median, в котором реализуется определяемая пользователем агрегатная функция, работающая с одиночным полем типа int и возвращающая значение типа int. Выражение в сигнатуре функции представляет сопоставление между типом события входного потока и одиночным целочисленным значением. Обратите внимание, что метод расширения никогда не будет выполняться, поэтому в его тексте имеется CepUtility.DoNotCall(). UDA, основанный на этой спецификации, может использоваться в LINQ, как показано в следующем примере.

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

Аргумент лямбда-выражение сопоставляет полезные данные события с целочисленным значением, которое будет служить входным аргументом определяемой пользователем агрегатной функции. В этом случае для каждого окна будет вычисляться медиана значений поля события val.

Затем рассмотрим пример независимой от времени определяемой пользователем агрегатной функции, которая содержит информацию о конфигурации. Она принимает в качестве входных все полезные данные типа Trade и возвращает значения типа double. Этот пример также включает соответствующий метод расширения:

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

Входными являются все полезные данные, поэтому методом расширения не задано какое-либо лямбда-выражение. Единственным параметром для определяемой пользователем агрегатной функции является значение, относящиеся к конфигурации (которое здесь имеет тип числа с плавающей точкой двойной точности):

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

Затем рассмотрим пример зависящей от времени определяемой пользователем агрегатной функции с информацией о конфигурации. Определяемая пользователем агрегатная функция вычисляет средневзвешенные по времени значения с интервальными событиями и представляется как ступенчатая функция (в которой каждый интервал является допустимым до следующего интервала). Аналогично предыдущему примеру, функция не принимает в качестве входного аргумента все полезные данные, а только значения типа double.

Учтите, что, хотя полезные данные события ограничиваются значениями типа double, входной набор все равно определяется в виде набора интервальных событий, а не набора полезных данных, как в случае определяемой пользователем агрегатной функции без учета времени. Это необходимо, чтобы включать отметки времени, поскольку определяемая пользователем агрегатная функция указана с учетом времени. Более того, само окно задано в форме объекта WindowDescription, которое имеет свойства времени начала и времени окончания. Эти отметки времени указываются в формате времени UTC. Также обратите внимание, что класс или структура UdaConfig должны поддерживать сериализацию через DataContractSerializer.

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

Где UDAConfig — это

public class UDAConfig
{
    public double Multiplier { get; set; }
}

Теперь в метод расширения также входит следующая структура конфигурации:

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

Конфигурация становится еще одним параметром в методе расширения:

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

В рассмотренных ранее примерах событие было типизированным, то есть типы полезных данных были известны на момент реализации определяемой пользователем агрегатной функции. В следующем примере реализуется определяемая пользователем агрегатная функция, имеющая универсальный тип входных данных. Входной тип передается в определяемую пользователем агрегатную функцию только во время выполнения.

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

Реализация определяемых пользователем операторов

Создатель определяемого пользователем оператора должен выполнить следующие действия.

  • Предоставить фактическую реализацию UDO.

  • Предоставить метод расширения для LINQ, чтобы составитель запроса мог использовать UDO.

При реализации UDO производится наследование от подходящего базового класса: CepOperator для не зависящих от времени UDO или CepTimeSensitiveOperator. При создании производного класса требуется создание экземпляра типа входного или выходного параметров. Входной тип всегда представляет полные полезные данные. Входной тип является либо набором полезных данных, либо набором событий, в зависимости от выбранного базового класса.

Помимо данных события, конструктору класса определяемого пользователем оператора, если это предусмотрено разработчиком, во время начала запроса можно передавать дополнительную структуру конфигурации. Если автор определяемого пользователем оператора предоставляет такой конструктор, то система будет вызывать его во время выполнения с использованием конфигурации, предоставленной вызывающим объектом определяемого пользователем оператора в LINQ.

Определяемые пользователем операторы с учетом времени и без учета времени получают полезные данные в виде неупорядоченного набора. В случае определяемого пользователем оператора с учетом времени с каждым элементом полезных данных дополнительно связываются отметки времени событий. Кроме того, в определяемый пользователем оператор передается дескриптор окна, который определяет время начала и время окончания для окна.

Поведение CTI в определяемых пользователем операторах

UDO изменяют приращения текущего времени (CTI) следующим образом. Если окно остается открытым, то есть не получено событие CTI с отметкой времени после времени окончания окна, то все CTI, которые попадают в это окно, изменяются на время его начала. Это гарантирует, что выход UDO, который может содержать пользовательские отметки времени, может изменяться, если окно все еще открыто.

Пример реализации определяемых пользователем операторов

В следующем примере реализуется не зависящий от времени оператор UDO, который не имеет данных конфигурации.

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

В следующем примере показано, как изменить сигнатуру, чтобы получить определяемый пользователем оператор с учетом времени, который принимает данные конфигурации.

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

Пример методов расширения для определяемых пользователем операторов

Наряду с реализацией UDO, разработчик UDO должен предоставить метод расширения для LINQ, чтобы создатель запроса мог использовать UDO. Метод расширения — это сигнатура, которая позволяет разработчику запроса использовать оператор и компилировать запрос. С помощью атрибута поставщик LINQ может ссылаться на класс, который фактически содержит реализацию определяемого пользователем оператора, как показано в следующем примере.

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

Этот UDO может теперь использоваться следующим образом.

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

В следующем примере показан метод расширения и использование определяемого пользователем оператора, который имеет структуру конфигурации, ссылаясь на реализацию, содержащуюся в классе с именем SampleUDOwithConfig.

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

Свойства полей события, относящиеся к культуре

Расширения, такие как определяемые пользователем операторы, функции и агрегатные функции, можно рассматривать в качестве интерфейсов между доменом обработки сложных событий с собственной системой типов и средой .NET CLR. Для некоторых приложений желательно иметь возможность передавать через этот интерфейс данные о культуре. Для определяемых пользователем агрегатных функций и операторов автор расширения может реализовать дополнительный интерфейс IDeclareEventProperties, который позволяет просматривать или задавать свойства культуры в полях событий. Для реализации этого интерфейса необходимо предоставить функцию DeclareEventProperties, которая возвращает объект типа CepEventType, содержащий данные о культуре для полей, как показано в следующем примере.

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

В этом примере определяемый пользователем оператор принимает входные события типа Input и создает события типа Output. Тип Output содержит строковые поля, для которых создатель определяемого пользователем оператора явно задает заметки с данными о культуре. Культура с именем zh-CN применяется к выходному полю firstName, а для выходного поля location задается культура, которая связана с полем loc в типе входного события определяемого пользователем оператора. Для каждого события, созданного определяемым пользователем оператором во время выполнения, эти культуры применяются к полям до того, как событие вставляется в выходной поток определяемого пользователем оператора.

Такой же интерфейс существует и для определяемых пользователем агрегатных функций. Поскольку агрегатные функции имеют только одно возвращаемое значение, то для применения к такому полю данных о культуре интерфейс IDeclareEventProperties заключает возвращаемое значение в единственное поле типа CepEventType, чтобы предоставить возможность создать для этого поля заметки со свойствами событий, относящимися к обработке сложных событий.

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

Здесь строка, представляющая результат агрегатной функции, заключается в оболочку CepEventType, чтобы автор определяемой пользователем агрегатной функции мог задать для этого поля свойство CultureInfo. Эти данные о культуре будут передаваться в фактическое поле события, которое принимает результат статистической обработки в запросе LINQ, где используется определяемая пользователем агрегатная функция.

См. также

Основные понятия

Использование окон событий

Другие ресурсы

Написание шаблонов запроса на языке LINQ