응용 프로그램 시간 이동

 

StreamInsight 개발자는 순서가 잘못된 데이터를 포함할 수 있는 데이터 원본에 대한 수요와 적극적인 이벤트 처리 요구 사항 간의 균형을 조정해야 합니다. 응용 프로그램 시간을 보다 빠르게 이동하면 대기 시간을 줄이는 데는 도움이 되지만, 늦게 도착하는 데이터용 창(즉, 잘못된 순서로 도착하는 데이터에 대한 처리 능력)이 축소됩니다. StreamInsight에서는 응용 프로그램 시간이 적절한지를 파악하는 다양한 방식을 제공합니다. 이 항목에서는 어댑터 수준에서 쿼리 바인딩을 사용하여 설정할 수 있는 응용 프로그램 시간 이동에 대한 다양한 수준 및 정책을 설명합니다.

임시 모델 이해

StreamInsight의 임시 모델은 응용 프로그램 시간만 기준으로 사용하며 시스템 시간은 기준으로 사용하지 않습니다. 즉, 모든 임시 연산자는 이벤트 타임스탬프를 참조하며 호스트 컴퓨터의 시스템 시계는 참조하지 않습니다. 따라서 응용 프로그램이 현재 응용 프로그램 시간을 StreamInsight 서버로 보내야 합니다. 지정된 응용 프로그램의 시간은 응용 프로그램 컨텍스트의 다양한 측면에 따라 달라집니다. 궁극적으로는 응용 프로그램 개발자가 적절한 응용 프로그램 시간을 StreamInsight 서버에 제공해야 합니다. 응용 프로그램 시간에 대한 주요 고려 사항은 다음과 같습니다.

  • 데이터 원본

    데이터 원본이 임시 정보를 전달할 때는 해당 데이터를 사용하여 데이터 원본의 모든 이벤트를 받은 지정 시간을 식별할 수 있습니다. 이 지정 시간은 해당 데이터 원본과 관련하여 현재 응용 프로그램 시간을 구성합니다. 각 데이터 원본의 진행 속도는 서로 다를 수 있습니다.

  • 순서가 잘못된 데이터

    일부 데이터 원본의 이벤트는 해당 타임스탬프 순서대로 도착하지 않습니다. 이 경우에는 데이터 순서가 잘못된 것입니다. StreamInsight는 순서가 잘못된 데이터를 수용할 수 있으며 StreamInsight 서버에서 이벤트가 도착하는 순서에 따라 결과가 달라지지 않도록 합니다. StreamInsight 개발자는 약간의 여유를 두고 응용 프로그램 시간을 이동하여 늦게 도착하는 이벤트가 포함된 데이터 원본에 대해 순서가 잘못된 이벤트를 공급하도록 허용할 수 있습니다.

  • 결과 활성화 여부

    StreamInsight 쿼리는 현재 응용 프로그램 시간까지 정확한 것으로 알려진 결과를 출력합니다. 즉, 결과는 전체 응용 프로그램 시간 진행을 통해 StreamInsight 쿼리가 완료될 때 쿼리에서 생성됩니다.

CTI(현재 시간 증분)

쿼리 처리 중에 응용 프로그램 시간은 CTI(현재 시간 증분) 이벤트를 통해 생성됩니다. CTI는 StreamInsight 임시 모델의 중앙 구성 요소인 문장 부호 이벤트입니다. CTI는 시간대의 특정 부분이 더 이상 변경되지 않음을 StreamInsight 서버에 알림으로써 계산된 결과를 쿼리 출력으로 출력하고 이벤트 시퀀스를 커밋하는 데 사용됩니다. 따라서 결과를 생성하고 상태 저장 연산자의 상태를 플러시하려면 CTI를 이벤트와 함께 입력 이벤트 스트림의 큐에 넣어야 합니다.

CTI를 큐에 넣어 입력에서 CTI 타임스탬프 이전의 기간에 영향을 주는 후속 이벤트를 생성하지 않는다는 것이 보장됩니다. 따라서 CTI가 입력의 큐에 넣어진 후 다음과 같습니다.

  • 셰이프 시점, 간격 또는 가장자리 시작 이벤트의 경우: 이벤트의 시작 시간은 CTI와 같거나 그 이후여야 합니다.

  • 셰이프 가장자리 종료 이벤트의 경우: 이벤트의 종료 시간은 CTI와 같거나 그 이후여야 합니다.

이러한 규칙을 위반할 경우 CTI 위반이라고 합니다. 아래에서는 이러한 위반이 처리되는 방법에 대해 설명합니다.

세 가지 방법으로 CTI를 입력 스트림에 삽입할 수 있습니다.

  1. 입력 어댑터를 통해 프로그래밍 방식으로 CTI를 큐에 넣습니다(이벤트를 큐에 넣는 것과 유사함).

  2. 지정된 빈도를 사용하여 CTI를 선언적으로 생성합니다. 이 작업은 어댑터 팩터리의 AdvanceTimeGenerationSettings를 통해 지정하거나 쿼리 바인딩의 일부분으로 지정할 수 있습니다.

  3. 별도의 입력 스트림을 CTI 원본으로 정의합니다. 이 작업은 쿼리 바인딩에서만 지정할 수 있습니다.

두 번째와 세 번째 방법을 구현하는 경우에는 항상 CTI 위반에 대한 정책도 구현해야 합니다. 다음 섹션에서는 AdvanceTimeGenerationSettings 및 위반 정책에 대해 설명합니다. 이후 섹션에서는 어댑터 팩터리와 쿼리 바인딩에서 이전 시간 설정을 사용하는 방법에 대해 설명합니다.

CTI 생성

CTI 생성(앞서 두 번째 및 세 번째 방법에서 설명함)에는 두 가지 차원이 있습니다.

  1. 양의 정수 N 또는 시간 범위 T로 지정하는 생성 빈도. 생성 빈도 정책은 이벤트 수(N) 또는 시간 범위(T)가 발생한 후에 CTI를 삽입합니다.

  2. 마지막으로 받은 이벤트에 대한 지연으로 지정되는 생성된 CTI의 타임스탬프

또한 부울 플래그를 사용하여 쿼리 종료 시 양의 무한대 타임스탬프를 포함하는 최종 CTI를 삽입할지 여부를 표시할 수 있습니다. 이를 통해 쿼리 연산자의 모든 나머지 이벤트를 플러시합니다.

CTI 생성 작업은 AdvanceTimeGenerationSettings 클래스를 통해 정의됩니다. 이 클래스의 생성자는 다음 예와 같이 빈도, 지연 및 플래그를 사용합니다.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);  

이 예에서는 이벤트 원본에서 발생하는 이벤트 10개마다 CTI를 하나씩 삽입하도록 엔진에 지시합니다. CTI에는 마지막 이벤트 시간에 5초를 뺀 값이 타임스탬프로 적용됩니다. 이 지연 메커니즘은 유예 기간을 효율적으로 구현하므로, 이벤트가 5초보다 오래 지연되지 않는다면 이벤트 원본이 CTI 의미 체계를 위반하지 않고 지연된 이벤트를 큐에 넣을 수 있습니다. 해당하는 쿼리를 종료할 때 제한 시간이 없는 CTI를 큐에 넣습니다.

AdvanceTimeSettings를 통해 CTI 생성의 빈도를 지정할 때는 종료 가장자리가 고려되지 않습니다. 기간을 빈도로 사용할 때도 고려되지 않습니다. 빈도 및 기간에 대한 가장자리 이벤트의 경우에는 시작 가장자리만 고려됩니다.

CTI 위반 정책

이벤트 원본이 삽입된 CTI보다 이전의 타임스탬프를 포함하는 이벤트를 보내서 CTI 의미 체계를 위반할 수 있습니다. 이전 시간 설정을 사용하면 이러한 발생을 처리하는 정책을 지정할 수 있습니다. 정책에는 다음의 두 값을 포함할 수 있습니다.

  • Drop

    삽입된 CTI를 위반하는 이벤트를 삭제하고 쿼리의 큐에 넣지 않습니다.

  • Adjust

    삽입된 CTI를 위반하는 이벤트의 수명이 CTI 타임스탬프와 겹치는 경우 이벤트를 수정합니다. 즉, 이벤트의 시작 타임스탬프를 가장 최근의 CTI 타임스탬프로 설정하면 이벤트가 유효해집니다. 이벤트의 시작 시간과 종료 시간이 모두 CTI 타임스탬프 이전이면 이벤트는 삭제됩니다.

어댑터 이전 시간 설정

참고


입력 및 출력 어댑터는 StreamInsight 이전 버전에서 도입되었습니다. 현재 개발 모델로 대체되었다고 해도 레거시 코드를 유지 관리하는 개발자들은 계속 사용할 수 있습니다. 현재 개발 모델에 대한 자세한 내용은 개발자 가이드(StreamInsight)를 참조하십시오.

응용 프로그램 시간 이동 설정은 어댑터 팩터리 정의에서 지정할 수 있습니다. 어댑터를 인스턴스화할 때마다 팩터리의 Create() 메서드를 호출하는 것과 같은 방식으로, 어댑터 인스턴스의 고급 시간 설정을 정의하는 해당 메서드가 호출됩니다. 이렇게 하려면 다음 예와 같이 형식화된 어댑터에는 ITypedDeclareAdvanceTimeProperties를, 형식화되지 않은 어댑터에는 IDeclareAdvanceTimeProperties를 사용합니다.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,  
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>  

이 인터페이스를 사용하려면 팩터리의 일부분으로 다음 메서드를 구현해야 합니다.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)  
{  
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);  
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);  
    return ats;  
}  

DeclareAdvanceTimeProperties() 메서드는 새로 인스턴스화되는 각 어댑터에 대해 호출되며, 이때 해당 Create() 메서드 호출에 지정된 것과 같은 구성 구조 및 이벤트 셰이프 매개 변수를 사용합니다. 따라서 어댑터 만든 이는 쿼리 작성기 및 바인더가 구체적인 이전 시간 설정을 인식하도록 할 필요 없이 구성 정보에서 올바른 CTI 생성 설정을 파생시킬 수 있습니다.

AdapterAdvanceTimeSettings 생성자에는 앞서 설명한 위반 정책과 AdvanceTimeGenerationSettings 개체가 모두 필요합니다.

CTI 생성

AdapterAdvanceTimeSettings와 마찬가지로, CTI 실행 작업은 다음 예와 같이 쿼리 바인딩에서 선언적으로 지정할 수 있습니다. 따라서 쿼리를 바인딩하는 사용자가 어댑터 구현과 독립적으로 CTI 응용 프로그램 시간 동작을 정의할 수 있습니다.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);  
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);  

AdvanceTimeSettings 생성자는 다음의 세 인수를 사용합니다.

  1. AdvanceTimeGenerationSettings 개체

  2. AdvanceTimeImportSettings 개체

  3. 위반 정책

생성 설정 또는 가져오기 설정 인수 중 하나를 Null로 설정할 수는 있지만 둘 다 Null로 설정할 수는 없습니다. 이들 인수를 함께 지정할 수도 있습니다. 다음 섹션에서는 AdvanceTimeImportSettings 클래스에 대해 설명합니다.

위의 예에서는 이벤트의 타임스탬프(지연 없음)를 사용하여 모든 이벤트에 대해 CTI를 생성하여 삽입하도록 지정합니다. 다음 예와 같이 AdvanceTimeSettings 개체를 CepStream.Create() 메서드에 대한 마지막 선택적 매개 변수로 전달할 수 있습니다.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);  
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);  
  
var inputstream = CepStream<MyPayloadType>.Create("inputStream",  
                                                  typeof(MyInputAdapterFactory),  
                                                  new MyConfiguration(),  
                                                  EventShape.Point,  
                                                  ats);  

이 개체는 쿼리 바인더 개발 모델에서도 사용할 수 있습니다.

queryBinder.BindProducer<MyPayloadType>("filterInput",  
                                        inputAdapter,  
                                        new MyConfiguration(),  
                                        EventShape.Point,  
                                        ats);  

다른 스트림과 동기화

쿼리 바인딩 중에 사용하는 경우에는 빈도를 기준으로 CTI를 생성함과 동시에(또는 생성하는 대신) AdvanceTimeImportSettings를 사용하여 CTI를 다른 입력 스트림에서 쿼리로 복사할 수도 있습니다. 이 기능을 사용하면 다음 예와 같이 두 스트림을 동기화할 수 있습니다.

var dataStream = CepStream<DataType>.Create("dataStream ",  
                                            typeof(DataInputAdapterFactory),  
                                            new MyDataAdapterConfiguration(),  
                                            EventShape.Point);  
  
var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);  
  
var lookupStream = CepStream<LookupType>.Create("lookupStream",  
                                                typeof(ReferenceInputAdapterFactory),  
                                                new MyReferenceConfiguration(),  
                                                EventShape.Edge,  
                                                ats);  
  
var joined = from eLeft in dataStream  
             join eRight in lookupStream  
             where ...  

이 예에서는 “고속” 데이터 스트림을 “저속” 참조 스트림과 조인해야 하는 일반적인 사용 사례를 보여 줍니다. 저속 스트림은 고속 스트림에 비해 변경 빈도가 훨씬 낮은 조회 데이터일 수 있습니다. 조인을 통해 최고속 입력만큼 빠른 출력을 생성할 수 있도록 저속 입력 스트림의 CTI를 가져와서 이 스트림을 고속 스트림에 동기화합니다. 이 예에서는 고속 스트림의 응용 프로그램 시간 처리를 어댑터에서 수행한다고 간주합니다.

결과 활성화 여부

이전 시간 생성 설정의 지연 매개 변수는 삽입된 CTI의 타임스탬프를 지정합니다. 출력의 활성화 여부를 원하는 대로 지정하려면 StreamInsight 프레임워크에서 CTI의 정확한 의미 체계를 이해해야 합니다. CTI는 시간대에서 CTI 타임스탬프의 정확히 앞에 있는 모든 항목을 커밋하도록 엔진에 지시합니다. 이로 인해 결과 활성화에 대한 여러 문제점이 있습니다.

예를 들어 시점 이벤트의 입력 스트림이 있으며 CTI 생성 설정에서 빈도는 1이고(모든 이벤트) 지연은 0인 경우를 가정해 봅니다. 이 경우 각 시점 이벤트의 정확히 동일한 타임스탬프를 사용하여 CTI를 생성합니다. 그러나 맨 마지막 시점 이벤트는 다음 CTI로만 커밋되는데 이는 이벤트의 타임스탬프가 정확히 해당하는 CTI 앞에 있지 않기 때문입니다. 어댑터에서 시점 이벤트를 실행하는 즉시 모든 이벤트를 커밋하려면 CTI 타임스탬프를 시점 이벤트 바로 뒤로 지정해야 합니다. 그러면 타임스탬프가 다음 예와 같이 음수 지연(1개 틱)으로 변환됩니다.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);  

CTI 및 쿼리 연산자

CTI는 위 설명에 따라 주입됩니다. 쿼리를 통해 전파되며 특정 연산자에 의해 다르게 처리됩니다. 예를 들어 Join 연산자는 양쪽으로부터 더 오래된 CTI까지의 결과를 출력합니다. Union 연산자는 양쪽의 가장 최근 CTI 중에서 더 오래된 결과를 출력합니다. 전체 쿼리는 가장 최근 CTI까지의 결과만 출력합니다.

반면 특정 연산자는 CTI 타임스탬프에 영향을 미칩니다. 창 내에 아직 이벤트가 들어오는 동안 창 위에서 연산 결과가 변경될 수 있기 때문에 도약 창은 창 내의 CTI를 창의 시작으로 되돌립니다. AlterEventStartTime()AlterEventLifeTime() 메서드는 모두 이벤트의 시작 시간을 변경하며 동일한 변환이 CTI에 적용됩니다.

참고 항목

입력 어댑터 및 출력 어댑터 만들기
Conceitos do servidor StreamInsight