사용자 정의 집계 및 연산자

이 항목의 예에서는 UDA(사용자 정의 집계) 및 UDO(사용자 정의 연산자)를 통해 StreamInsight LINQ 연산자의 창 기반 연산을 확장하는 방법을 보여 줍니다. 이러한 확장은 이벤트 창에 대해 정의되며 0개 이상의 결과 이벤트를 반환합니다. 사용자 정의 집계 또는 사용자 정의 연산자는 런타임에 어댑터가 제공 및 사용되는 것과 같은 방식으로 StreamInsight 서버에서 액세스 가능한 어셈블리로 컴파일해야 합니다.

또한 StreamInsight에서는 또 다른 확장성 메커니즘으로 사용자 정의 스트림 연산자를 제공합니다. 사용자 정의 스트림 연산자는 이벤트 창에 대해서가 아니라 이벤트 스트림에 대해 직접 정의됩니다.

사용자 정의 집계

사용자 정의 집계는 창 사양을 기반으로 해당 창의 이벤트를 집계하고 단일 결과 값을 생성하는 데 사용됩니다. UDA는 CEP 이벤트 집합이 포함된 CEP 창(도약/스냅숏 창의 결과 또는 개수 기반 창 연산자)을 입력으로 사용하여 단일 반환 값(StreamInsight 기본 유형 중 하나에 매핑되는 CLR 형식)을 출력합니다. 창에 대한 자세한 내용은 이벤트 창 사용을 참조하십시오.

StreamInsight에서 제공하는 count, sum 및 average와 유사한 단순한 집계에 비해 기능면에서 보다 복잡한 UDA를 구현할 수 있습니다. 이러한 UDA의 한 예인 시간 가중치 평균 계산 작업은 이후 섹션에서 설명합니다.

사용자 정의 연산자

사용자 정의 연산자는 창 사양을 기반으로 해당 창의 이벤트를 처리하고 하나 이상의 결과 이벤트 집합을 생성하는 데 사용됩니다. UDO는 CEP 이벤트 집합이 포함된 CEP 창(도약/스냅숏 창의 결과 또는 개수 창 연산자)을 입력으로 사용하여 CEP 이벤트 집합 또는 CEP 페이로드 집합을 출력합니다.

각 창에 대해 전체 이벤트(해당 타임스탬프 포함)를 생성하거나 전체 이벤트에 적용할 계산을 수행해야 하는 경우 UDO를 사용할 수 있습니다. 집계를 계산하면서 이벤트 상태 필드를 설정하는 경우를 예로 들 수 있습니다. 이 경우 상태는 집계 결과 및 다른 매개 변수에 따라 달라집니다. 예를 들어 UDO는 집계 결과가 들어 있는 페이로드 필드 및 집계 결과가 일부 제약 조건을 위반했는지 여부를 나타내는 상태 필드가 포함된 단일 이벤트를 각 창에 대해 생성할 수 있습니다.

UDA 및 UDO의 시간 중요성

UDA 및 UDO를 구현하도록 선택한 기본 클래스에 따라 UDA 및 UDO에서 시간이 중요한지 중요하지 않은지를 정의할 수 있습니다.

시간이 중요하지 않은 UDA 및 UDO로는 해당 타임스탬프를 포함한 전체 이벤트를 전달할 필요가 없습니다. 이러한 UDA 및 UDO는 정의된 창에 있는 이벤트에 대한 하나 이상의 페이로드 필드 집합만 고려합니다. 또한 현재 창의 시작 시간과 종료 시간도 이러한 UDA 및 UDO로 전달되지 않습니다.

시간이 중요한 UDA 및 UDO는 각 창에 대해 타임스탬프 및 창 시작/종료 시간을 포함한 이벤트 집합을 전달받습니다. UDA 또는 UDO에서 시간이 중요한지 여부는 해당 UDA 또는 UDO 만든 이가 구현을 파생하는 개별 기본 클래스에 따라 결정됩니다.

사용자 정의 집계 구현

UDA 만든 이는 다음을 수행해야 합니다.

  • 실제 UDA 구현 제공

  • 쿼리 작성자가 UDA를 사용할 수 있도록 LINQ에 대한 확장 메서드 제공

UDA를 구현하려면 사용자가 적절한 기본 클래스(시간이 중요하지 않은 UDA의 경우 CepAggregate, 시간이 중요한 UDA의 경우 CepTimeSensitiveAggregate)를 파생합니다.

클래스를 파생하려면 입력 및 출력 유형 매개 변수를 인스턴스화해야 합니다. 입력 유형은 StreamInsight 형식 시스템의 해당 기본 유형으로 매핑되는 CLR 형식(UDA에 대한 입력이 단일 필드인 경우) 또는 전체 페이로드(UDA가 해당 계산 과정에서 전체 페이로드 필드 집합을 확인할 수 있어야 하는 경우)를 나타냅니다. 두 경우 모두 출력 유형은 해당 기본 유형으로 매핑되는 CLR 형식이어야 합니다.

UDA 만든 이가 지정한 경우 이벤트 데이터 외에 쿼리 시작 시간의 선택적 구성 구조를 UDA 클래스 생성자로 전달할 수 있습니다. UDA 만든 이가 이러한 생성자를 제공하는 경우 엔진은 LINQ에서 UDA 호출자가 제공하는 구성을 사용하여 해당 생성자를 런타임에 호출합니다.

시간이 중요하지 않은 UDA와 시간이 중요한 UDA는 모두 페이로드를 정렬되지 않은 집합으로 받습니다. 시간이 중요한 UDA의 경우 이벤트 타임스탬프가 각 페이로드와 추가적으로 연결됩니다. 또한 창 시작 및 종료 시간을 정의하는 창 설명자가 UDA로 전달됩니다.

사용자 정의 집계 예

다음 예에서는 정수 이벤트 필드 집합을 사용하여 시간이 중요하지 않은 UDA를 구현합니다. 이 예 구현에 대해서는 선택적 구성 구조를 지정하지 않으므로 클래스에 특정 생성자가 필요하지 않습니다.

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

UDA를 구현하는 것 외에, 쿼리 작성자가 UDA를 사용할 수 있도록 LINQ에 대한 확장 메서드도 제공해야 합니다. 확장 메서드는 쿼리 만든 이가 집계를 사용하고 쿼리를 컴파일할 수 있도록 하는 서명입니다. StreamInsight LINQ 공급자는 특성을 통해 다음 예와 같이 UDA 구현이 포함된 실제 클래스를 참조할 수 있습니다.

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

여기서는 Median 클래스를 통해 UDA를 구현해야 합니다. 그러면 int 형식의 단일 필드에서 작동하는 UDA가 구현되며 int 형식 값이 반환됩니다. 함수 서명의 식은 입력 스트림의 이벤트 유형에서 단일 정수 값으로의 매핑을 나타냅니다. 확장 메서드는 실행되지 않으므로 본문에 CepUtility.DoNotCall()이 있습니다. 이 지정을 기준으로 다음 예와 같이 LINQ에서 UDA를 사용할 수 있습니다.

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

람다 식 인수는 이벤트 페이로드를 UDA에 대한 입력으로 사용될 정수 값에 매핑합니다. 여기서는 각 창에 대해 val 이벤트 필드 값의 중앙값이 계산됩니다.

다음으로는 구성 정보가 있으며 시간이 중요하지 않은 UDA의 예를 살펴 봅니다. 이 예는 Trade 형식의 전체 페이로드를 입력으로 사용하며 double 형식 값을 반환합니다. 이 예에는 해당 확장 메서드도 포함되어 있습니다.

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

전체 페이로드를 입력으로 사용하므로 람다 식이 확장 메서드를 통해 지정되지 않습니다. UDA의 유일한 매개 변수는 구성 값(이 예에서는 double 형식임)입니다.

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

다음으로는 구성 정보가 있으며 시간이 중요한 UDA의 예를 살펴 봅니다. 이 UDA는 계단 함수로 해석되는 간격 이벤트(각 간격이 다음 간격까지 유효함)가 포함된 시간 가중치 평균입니다. 이전 예와 마찬가지로 이 예에서도 전체 페이로드를 입력으로 사용하지 않으며 double 형식 값만 사용합니다.

이벤트 페이로드가 double 값으로 감소하더라도 입력 집합은 시간이 중요한 UDA의 경우와 같은 페이로드 집합이 아니라 간격 이벤트 집합으로 정의됩니다. UDA에서 시간이 중요한 것으로 지정되므로 타임스탬프를 포함하려면 이렇게 정의해야 합니다. 또한 창 자체는 창 시작 시간 및 종료 시간 속성을 가진 WindowDescription 개체의 형식으로 지정됩니다. 이 타임스탬프는 UTC 시간에 따라 다릅니다. 또한 UdaConfig는 DataContractSerializer를 통해 직렬화할 수 있어야 하는 클래스 또는 구조입니다.

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

여기서 UDAConfig는 다음과 같습니다.

public class UDAConfig
{
    public double Multiplier { get; set; }
}

이제 확장 메서드에는 다음 구성 구조도 포함됩니다.

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

구성은 확장 메서드의 또 다른 매개 변수가 됩니다.

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

지금까지 설명한 예에서는 이벤트가 형식화되는 경우를 고려했습니다. 즉, UDA 구현 시 페이로드 유형이 이미 알려져 있습니다. 다음 예에서는 입력 유형이 런타임에만 UDA로 전달되는 일반 입력 유형을 포함하는 UDA를 구현합니다.

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

사용자 정의 연산자 구현

UDO 만든 이는 다음을 수행해야 합니다.

  • 실제 UDO 구현 제공

  • 쿼리 작성자가 UDO를 사용할 수 있도록 LINQ에 대한 확장 메서드 제공

UDO를 구현하려면 사용자가 적절한 기본 클래스(시간이 중요하지 않은 UDO의 경우 CepOperator, 또는 CepTimeSensitiveOperator)에서 클래스를 파생합니다. 클래스를 파생하려면 입력 및 출력 유형 매개 변수를 인스턴스화해야 합니다. 입력 유형은 항상 전체 페이로드를 나타내며, 출력 유형은 선택한 기본 클래스에 따라 페이로드 집합이거나 이벤트 집합입니다.

UDO 만든 이가 지정한 경우 이벤트 데이터 외에 쿼리 시작 시간의 선택적 구성 구조를 UDO 클래스 생성자로 전달할 수 있습니다. UDO 만든 이가 생성자를 제공하는 경우 엔진은 LINQ에서 UDO 호출자가 제공하는 구성을 사용하여 해당 생성자를 런타임에 호출합니다.

시간이 중요하지 않은 UDO와 시간이 중요한 UDO는 모두 페이로드를 정렬되지 않은 집합으로 받습니다. 시간이 중요한 UDO의 경우 이벤트 타임스탬프가 각 페이로드와 추가적으로 연결됩니다. 또한 창 시작 및 종료 시간을 정의하는 창 설명자가 UDO로 전달됩니다.

사용자 정의 연산자의 CTI 동작

UDO는 다음과 같은 방식으로 CTI(현재 시간 증분)를 변경합니다. 즉, 창이 계속 “열린” 상태여서 창 종료 시간 이후에 타임스탬프가 포함된 CTI가 수신되지 않았으면 해당 창에 포함되는 모든 CTI가 창 시작 시간으로 변경됩니다. 따라서 창이 계속 열려 있는 한 사용자 정의 타임스탬프를 포함할 수 있는 UDO 출력이 변경될 수 있습니다.

사용자 정의 연산자 구현 예

다음 예에서는 구성 정보가 없으며 시간이 중요하지 않은 UDO를 구현합니다.

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

다음 예에서는 구성 정보를 허용하는 시간이 중요한 UDO로 서명을 변경하는 방법을 보여 줍니다.

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

사용자 정의 연산자용 확장 메서드 예

UDO 만든 이는 UDO를 구현해야 할 뿐 아니라 쿼리 작성자가 UDO를 사용할 수 있도록 LINQ에 대한 확장 메서드도 제공해야 합니다. 확장 메서드는 쿼리 만든 이가 연산자를 사용하고 쿼리를 컴파일할 수 있도록 하는 서명입니다. LINQ 공급자는 특성을 통해 다음 예와 같이 UDO 구현이 포함된 실제 클래스를 참조할 수 있습니다.

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

그러면 이 UDO 클래스를 다음과 같이 사용할 수 있습니다.

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

다음 예에서는 구성 구조가 있는 UDO의 확장 메서드 및 사용법을 보여 줍니다. 구성 구조는 SampleUDOwithConfig 클래스에 포함된 구현을 참조합니다.

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

culture 관련 이벤트 필드 속성

UDO, UDA, UDF 등의 확장 프로그램은 형식 시스템을 포함하는 CEP 도메인과 .NET CLR 간의 인터페이스로 간주할 수 있습니다. 일부 응용 프로그램의 경우에는 이 인터페이스를 통해 culture 정보를 전달할 수 있도록 하는 것이 좋습니다. UDA 및 UDO의 경우 확장 프로그램 만든 이가 이벤트 필드에 대해 culture 관련 속성 검사 또는 설정을 허용하는 추가 인터페이스인 IDeclareEventProperties를 구현할 수 있습니다. 이 인터페이스를 구현하려면 CepEventType 개체를 반환하는 DeclareEventProperties 함수를 제공해야 합니다. 이 개체는 다음 예와 같이 해당 필드의 culture 정보를 전달할 수 있습니다.

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

이 예에서 UDO는 Input 유형의 입력 이벤트를 사용하여 Output 유형의 이벤트를 생성합니다. Output 유형에는 UDO 만든 이가 특정 culture 정보를 사용하여 명시적으로 주석을 추가하려는 문자열 필드가 있습니다. zh-CN culture는 firstName 출력 필드에 적용되고, location 출력 필드에는 UDO의 입력 이벤트 유형에 있는 loc 필드에 연결된 것과 같은 culture를 사용하여 주석이 추가됩니다. 런타임에 UDO가 생성하는 모든 이벤트에 대해 이러한 culture는 UDO의 출력 스트림으로 이벤트가 삽입되기 전에 해당 필드에 적용됩니다.

사용자 정의 집계에 대해서도 동일한 인터페이스가 있습니다. 집계에는 단일 반환 값만 포함되므로 이 필드에 culture 관련 정보를 적용하기 위해 IDeclareEventProperties 인터페이스가 단일 필드를 포함하는 CepEventType으로 반환 값을 래핑합니다. 이를 통해 해당 필드에 CEP 관련 이벤트 속성을 주석으로 추가할 수 있습니다.

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

여기서는 집계 결과를 나타내는 문자열을 CepEventType으로 래핑하므로 UDA 만든 이가 해당 필드에 대해 CultureInfo 속성을 설정할 수 있습니다. 이 culture 정보는 UDA가 사용되는 LINQ 쿼리에서 집계 결과를 받는 실제 이벤트 필드로 전파됩니다.

참고 항목

개념

이벤트 창 사용

관련 자료

LINQ에서 쿼리 템플릿 작성