Share via


사용자 정의 스트림 연산자

사용자 정의 스트림 연산자를 사용하여 원하는 방식으로 이벤트 스트림을 처리하도록 정의할 수 있습니다.

사용 패턴

쿼리 내에서 CepStreamScan 확장 메서드를 사용하여 사용자 정의 스트림 연산자를 호출합니다. 다음 예와 같이 입력 스트림과 연산자의 초기 상태를 제공합니다.

var query = input.Scan(new SmoothingOperator(0.7));

연산자를 만든 이는 추상 CepPointStreamOperator 또는 CepEdgeStreamOperator 유형에서 새 클래스를 파생합니다. 새 유형은 연산자의 상태 시스템을 캡슐화합니다. 이 유형의 생성자에 대한 호출이 scan 연산자에 전달되어 연산자의 초기 상태를 설정합니다.

사용자 정의 스트림 연산자의 특성

사용자 정의 스트림 연산자를 통해 사용자가 이벤트 스트림과 단계적으로 상호 작용하도록 지원할 수 있습니다. 이와 같이 사용자 정의 스트림 연산자는 StreamInsight 이벤트 처리 엔진과 CLR 간의 경계를 나타내며 이는 어댑터 및 사용자 정의 연산자 또는 집계와 비슷합니다. 이러한 모든 경우에 엔진과 개발자는 스트림의 임시 속성에 관한 계약을 이행합니다. StreamInsight 엔진은 특정 쿼리 입력 이벤트 시퀀스에 대해 결정적 방식으로 동작하기 위해 사용자 정의 스트림 연산자에 대해 다음과 같은 사항을 보장합니다.

  • 사용자 정의 스트림 연산자는 동기화 시간(시점 및 가장자리 시작 이벤트의 경우 시작 시간, 가장자리 종료 이벤트의 경우 종료 시간)별로 정렬된 이벤트를 받도록 보장됩니다. 간격 이벤트는 지원되지 않습니다. 간격 이벤트에는 동기화 시간별로 정렬된 직관적인 표현이 없으며 각 이벤트가 시작과 종료라는 두 개의 시점을 나타내기 때문입니다.

  • 삽입 이벤트만 사용자 정의 스트림 연산자에 전달됩니다. 사용자 정의 스트림 연산자는 들어오는 스트림에 있는 CTI(현재 시간 증분) 이벤트를 인식하지 못하지만 CTI 이벤트는 사용자 정의 스트림 연산자가 시간의 경과를 인지하는 방법을 결정합니다(아래의 NextCti 참조).

  • 사용자 정의 스트림 연산자에서 허용하는지 여부에 따라 StreamInsight는 연산자를 비활성화할 수 있습니다(아래의 IsEmpty 참조). StreamInsight는 비활성화된 사용자 정의 스트림 연산자를 재활용할 수 있습니다.

  • 각 삽입 이벤트는 ProcessEvent가 호출되도록 하며 그런 다음에는 NextCti 및 IsEmpty 속성이 폴링됩니다.

사용자 정의 스트림 연산자의 입력 및 출력

사용자 정의 스트림 연산자는 입력 이벤트를 한 번에 하나씩 처리하며, 각 입력 이벤트에 응답하여 0-*개의 출력 이벤트를 생성할 수 있습니다. 또한 입력에 응답하여 연산자의 내부 상태를 업데이트할 수 있습니다. 입력 이벤트는 시간의 경과를 나타내기 위해 연산자가 요청할 경우 생성되는 CTI 이벤트이거나 삽입 이벤트일 수 있습니다. 입력에는 임시로 주석이 추가됩니다.

반면 출력 이벤트는 단순히 이벤트 페이로드입니다. 출력 이벤트에는 타임스탬프를 할당하거나 출력 스트림에 CTI를 삽입할 기회가 없습니다. 출력 이벤트는 해당 입력 이벤트의 타임스탬프에 기반을 둔 타임스탬프를 포함하는 시점 이벤트로 생성됩니다.

사용자 정의 스트림 연산자의 시간 처리

사용자 정의 스트림 연산자를 새로 만들 경우 코드는 이벤트의 페이로드를 처리하기만 하면 됩니다. 시간은 StreamInsight에서 단독으로 처리합니다. 입력 이벤트는 순서대로 수신됩니다. 각 출력 이벤트의 타임스탬프는 해당 입력 이벤트의 타임스탬프에 기반을 둡니다. 예를 들어 가장자리 종료 이벤트가 트리거되고 출력 이벤트가 트리거되면 해당 출력 이벤트는 가장자리 종료 이벤트의 타임스탬프를 받습니다. 따라서 연산자는 시간의 영향을 받을 수는 있지만 시간을 제어할 수는 없습니다.

사용자 정의 스트림 연산자는 입력 스트림의 ProcessEvent() 메서드에서 직접 CTI를 받지는 않지만 NextCti 속성을 통해 시간의 경과에 반응할 수 있습니다. 이 속성은 ProcessEvent()가 호출될 때마다 엔진에 의해 폴링됩니다. 사용자 정의 스트림 연산자는 ProcessEvent()에 대한 호출로 받게 될 다음 CTI 타임스탬프를 나타내는 타임스탬프를 반환할 수 있습니다.

NextCti 속성을 설정함으로써 요청된 이러한 CTI만 ProcessEvent에 전달됩니다. 이러한 CTI는 사용자 정의 스트림 연산자 외부로 전파되지 않습니다.

사용자 정의 스트림 연산자 구현

사용자 정의 스트림 연산자를 새로 만들려면 추상 CepPointStreamOperator 또는 CepEdgeStreamOperator 유형에서 새 클래스를 파생시킵니다.

  • 추상 CepPointStreamOperator 기본 클래스에서 파생될 경우 연산자는 입력 이벤트를 시점 이벤트로 인식합니다. 그러나 이벤트가 실제로 시점 이벤트가 아니어도 오류가 아닙니다. 연산자는 이벤트의 시작 시간만 인식합니다.

  • 추상 CepEdgeStreamOperator 기본 클래스에서 파생될 경우 연산자는 입력 이벤트의 시작 가장자리와 종료 가장자리를 모두 인식합니다.

파생된 클래스에서 다음 속성과 메서드를 재정의합니다.

  • ProcessEvent 메서드. 각 입력 이벤트에 응답하여 출력을 생성하고 연산자의 내부 상태를 업데이트합니다. ProcessEvent는 한 개의 입력 이벤트를 받아 0개 이상의 출력 페이로드를 반환할 수 있습니다.

  • IsEmpty 속성. 연산자의 내부 상태가 비어 있는지 여부를 나타냅니다. True일 경우 StreamInsight 쿼리 엔진은 메모리 사용률을 줄이기 위해 연산자 인스턴스를 삭제할 수 있습니다.

  • NextCti 메서드(선택 사항). CTI 이벤트가 연산자로 전송되는 다음 시점을 나타냅니다. 이 속성을 재정의하면 사용자 정의 연산자에서 이후에 특정 시점에 출력을 생성하거나 일정 응용 프로그램 시간 간격이 경과된 후 내부 상태가 비어 있음을 나타낼 수 있습니다.

파생된 클래스는 WCF 직렬화도 구현해야 합니다. 자세한 내용은 방법: 클래스 또는 구조체에 대한 기본 데이터 계약 만들기를 참조하십시오.

StreamInsight 엔진과 연산자의 상호 작용 방식

각 연산자 인스턴스에 대해 동기화 시간 순서로 정렬된 이벤트와 함께 ProcessEvent 메서드가 호출됩니다. 시점 이벤트나 CTI 이벤트의 경우 동기화 시간은 유효한 시작 시간입니다. 가장자리 이벤트의 경우 동기화 시간은 가장자리 시작에 대한 유효한 시작 시간이거나 가장자리 종료에 대한 유효한 종료 시간입니다.

ProcessEvent 메서드가 호출될 때마다 IsEmptyNextCti 속성이 폴링됩니다.

연산자가 NextCti를 재정의하면 엔진은 연산자가 처리하는 다음 이벤트가 동기화 시간 값이 NextCti보다 작은 삽입 이벤트이거나 시작 시간 값이 NextCti인 CTI가 되도록 보장합니다. 연산자가 처리된 마지막 이벤트의 동기화 시간보다 작거나 같은 NextCti 값을 반환하면 값이 무시됩니다. NextCti 속성은 연산자가 입력 스트림의 시간 진행을 고유의 리듬(내부 CTI 형식)으로 "변환"한 다음 그에 맞게 적절히 반응할 수 있게 합니다.

연산자는 입력 이벤트에 대응해서만 활성화됩니다. CTI는 활성화를 트리거하지 않습니다. 연산자가 IsEmpty에서 true를 반환하면 비활성화됩니다.

언제든지 엔진은 연산자를 직렬화하고 연산자에 대한 참조를 해제할 수 있습니다. 나중에 연산자가 역직렬화되면 원래 있었던 위치로 돌아갑니다.

사용자 정의 스트림 연산자의 예

지수 평활법

이 사용자 정의 스트림 연산자는 시점 이벤트 스트림을 값 시퀀스로 처리하고 지수 평활법을 적용합니다. System.Runtime.Serialization에 대한 참조가 필요합니다.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

패턴 일치

이 단순한 패턴 일치 예에서는 IsEmptyNextCti의 다른 용도를 보여 줍니다. 이 예에서 연산자는 값이 1.0이며 30초 내에 값이 2.0인 이벤트가 뒤따르지 않는 이벤트를 찾습니다. 이 예는 사용자 정의 스트림 연산자의 유용한 개념을 잘 보여 줍니다. 실제 응용 프로그램에서 이 예는 StreamInsight의 기본 제공 연산자만 사용하면 구현할 수 있는 간단한 패턴입니다.

위의 예에서는 NextCti를 사용하여 연산자의 수명을 제어했습니다. 이 예에서도 같은 용도로 NextCti를 사용하지만 시간 경과에 대한 응답으로 출력을 생성할 용도로도 NextCti를 사용합니다.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

사용상 편의를 위해 도우미 메서드 정의

쿼리에서 연산자 사용 패턴을 단순화할 수 있습니다. 예를 들어 쿼리 만든 이에게는 input.Scan(new SmoothingOperator(0.5)) 대신 input.Smooth(0.5)를 작성하는 것이 더 편리할 것입니다.

다음과 같은 사용자 지정 확장 메서드를 만들어 이와 같은 단순한 패턴을 지원할 수 있습니다.

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }