런타임에 쿼리 작성

 

런타임에 StreamInsight 쿼리를 작성하면 유연한 쿼리를 만들고 다시 사용할 수 있으며 리소스를 효율적으로 사용하고 유지 관리를 쉽게 수행할 수 있습니다. 다음과 같은 작업을 수행할 수 있습니다.

  • 같은 서버에서 한 쿼리의 결과를 다른 쿼리로 제공할 수 있습니다.

  • 입력 어댑터에서 이벤트를 사용하는 것처럼 실행 중인 다른 쿼리의 출력을 사용할 수 있습니다.

작성된 두 쿼리(예: 쿼리 1이 쿼리 2로 공급됨)는 격리되어 실행됩니다. 따라서 쿼리 1이 실패해도 쿼리 2의 상태에는 영향이 없으며 그 반대의 경우도 마찬가지입니다. 쿼리 1과 쿼리 2는 독립적으로 시작 및 중지할 수 있습니다. 예를 들어 쿼리 1을 중지한 다음 다른 쿼리로 바꿔서 다시 시작할 수 있습니다.

이 항목에서는 런타임에 동적으로 쿼리를 작성하는 다양한 사용 사례 및 예제에 대해 설명합니다.

데이터를 전처리하여 출력 어댑터로 보내는 기본 쿼리를 디자인 및 배포해야 하는 쿼리도 많은 한편, 이 쿼리의 결과를 사용하고 자체 결과를 다른 출력 어댑터로 보내는 쿼리도 있습니다. 다음 그림에는 이러한 시나리오가 나와 있습니다.

7c8869cc-c70c-412c-a50c-8e8b780dfe1c

다음 예에서는 StreamInsight 서버의 기존 응용 프로그램인 myApp에서 만든 쿼리를 보여 줍니다.

var inputstream = CepStream<MyDataType>.Create("inputStream",  
                                               typeof(MyInputAdapterFactory),  
                                               new InputAdapterConfig { someFlag = true },  
                                               EventShape.Point);  
  
var filtered = from e in inputstream  
               where e.Value > 95  
               select e;  
  
var query = filtered.ToQuery(myApp,  
                             "filterQuery",  
                             "Filter out Values over 95",  
                             typeof(MyOutputAdapterFactory),  
                             new OutputAdapterConfig { someString = "foo" },  
                             EventShape.Point,  
                             StreamEventOrder.FullyOrdered);  
  
query.Start();  

이 쿼리의 결과를 두 번째 쿼리로 스트리밍하려면 Query.ToStream() 메서드를 사용합니다. 다음 예와 같이 기본 쿼리의 출력 페이로드와 일치하는 형식이 제네릭 매개 변수로 지정됩니다.

var filteredStream = query.ToStream<MyDataType>();  
  
var validated = from e in filteredStream  
                select new  
                {  
                    SourceId = e.SourceId,  
                    Value = e.Value,  
                    Status = e.Value > 75 ? false : true  
                };  
  
var validationQuery = validated.ToQuery("validationQuery",  
                                        "Validating Query",  
                                        typeof(MyOutputAdapterFactory),  
                                        new OutputAdapterConfig { someString = "foo" },  
                                        EventShape.Point,  
                                        StreamEventOrder.FullyOrdered);  
  
validationQuery.Start();  

이 예에서는 기본 쿼리의 출력 스트림에 액세스한 다음 프로젝션 연산자를 적용해 Status라는 새 필드를 생성합니다. ToQuery()를 두 번째로 호출할 때 응용 프로그램 개체는 기본 쿼리에서 유추할 수 있으므로 더 이상 필요하지 않습니다.

ToStream() 메서드는 CTI(현재 시간 증분)를 해당 지점에 주입해야 하는 경우 선택적 AdvanceTimeSettings 개체를 사용합니다. CTI를 삽입하면 특정 쿼리 구성의 활성화 가능성을 높일 수 있습니다.

기본 쿼리 개체를 만드는 방법은 관계가 없습니다. 위의 모델에서는 CepStream.ToQuery() API 사용 예를 보여 줍니다. 다음과 같은 방법으로 쿼리를 만들 수도 있습니다.

  • 쿼리 바인더를 사용합니다. 예: myApp.CreateQuery("filterQuery", queryBinder, "description");

  • 서버에서 개체 모델 API를 통해 쿼리를 검색합니다. 예: myApp.Queries["filterQuery"]

바인딩 해제된 쿼리 출력

위의 예에서는 해당 출력이 이미 출력 어댑터에 바인딩된 기존 쿼리의 결과를 다시 사용하는 방법을 보여 줍니다. 하나 이상의 다른 쿼리가 결과를 사용하는 경우가 아니면 출력이 생성되지 않도록 바인딩 해제된 출력 스트림을 쿼리에 포함할 수도 있습니다. 다음 그림에는 이러한 시나리오가 나와 있습니다.

eaec4238-aa80-4f01-b39d-c28f1da5a541

이러한 방식을 사용하려면 어댑터가 필요하지 않은 CepStream.ToQuery()의 오버로드를 사용합니다.

var query = filtered.ToQuery(myApp,  
                             "filterQuery",  
                             "Filter out Values over 95",  
                             EventShape.Point,   
                             StreamEventOrder.FullyOrdered);  

이 쿼리를 시작할 수 있습니다. 두 번째 쿼리는 위의 validationQuery 쿼리 예에서와 같이 결과 스트림을 지정해 나중에 사용할 수 있습니다. 소비자가 없으면 기본 쿼리의 결과는 삭제됩니다.

이 패턴을 사용하면 쿼리 결과를 여러 출력 어댑터로 스트리밍할 수도 있습니다. 이러한 방식을 가장 간단하게 사용하려면 바인딩 해제된 쿼리를 기반으로 통과 쿼리를 각 출력 어댑터마다 하나씩 사용하면 됩니다(위 그림의 쿼리 2 및 3).

게시된 스트림

지금까지 살펴본 예에서는 다른 쿼리에 대해 새 입력 스트림을 만들기 위해 실제 쿼리 개체를 사용했습니다. 클라이언트 쪽 개체에 대해 추상적인 방식을 사용하려면 아래 그림과 같이 게시된 스트림 URI를 하나 이상의 다른 쿼리에 대해 입력으로 사용하면 됩니다.

fb3e8533-2783-42fa-bf5d-7a5b0afef1a6

각 쿼리에는 기본적으로 게시된 스트림의 URI(Uniform Resource Identifier), 즉 쿼리 이름 자체가 있습니다. 또한 CepStream 클래스의 적절한 멤버를 통해 사용자 지정 게시된 스트림 이름을 쿼리에 명시적으로 할당할 수도 있습니다.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),  
                                             myApp,  
                                             "filterQuery",  
                                             "Filter out Values over 95",  
                                             EventShape.Point,  
                                             StreamEventOrder.FullyOrdered);  

이렇게 하면 명시적인 명명된 출력(바인딩 해제됨)이 포함된 쿼리가 작성됩니다. 게시된 스트림의 이름은 "<application_name>/PublishedStream/<stream_name>" 규칙을 따라야 합니다.

다음 예에서와 같이 이제는 다른 쿼리도 이 URI를 해당 입력 스트림으로 참조할 수 있습니다.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),  
                                                EventShape.Point);  
var validated = from e in filterStream  
                ...  

게시된 스트림 소비자는 입력 이벤트 셰이프를 지정해야 하는데, 이 셰이프는 참조되는 쿼리의 출력 셰이프와 일치해야 합니다.

게시된 스트림 이름을 통해 기본 쿼리에 연결하는 것이 쿼리 개체를 통해 연결하는 것보다는 단순합니다. 따라서 보조 쿼리를 정의할 때는 응용 프로그램에 다음 정보를 제공해야 합니다.

var validationQuery = validated.ToQuery(myApp,  
                                        "validationQuery",  
                                        "Validating Query",  
                                        typeof(MyOutputAdapterFactory),  
                                        new OutputAdapterConfig { someString = "foo" },  
                                        EventShape.Point,  
                                        StreamEventOrder.FullyOrdered);  

게시된 스트림 어댑터

작성된 쿼리의 어댑터를 검색할 때는(예: Query.InputStreamBindings를 통해 검색) 특수한 기본 제공 어댑터를 사용하여 어댑터를 연결합니다. 이러한 기본 제공 어댑터를 기반으로 하여 앞서 살펴본 것처럼 CepStream.ToQuery, Query.ToStream() 등을 통한 쿼리 작성 기능을 편리하게 활용할 수 있습니다. 또한 이러한 기능은 다음 예와 같이 자체 구성 구조(게시된 스트림 이름 포함)를 가진 일반 어댑터처럼 명시적으로 사용할 수도 있습니다.

// primary query, with custom input and output adapters  
var inputstream = CepStream<MyDataType>.Create("inputStream",  
                                               typeof(MyInputAdapterFactory),  
                                               new InputAdapterConfig { someFlag = true },  
                                               EventShape.Point);  
  
var filtered = from e in inputstream  
               where e.Value > 95  
               select e;  
  
var query = filtered.ToQuery(myApp,  
                             "filterQuery",  
                             "Filter out Values over 95",  
                             typeof(MyOutputAdapterFactory),  
                             new OutputAdapterConfig { someString = "foo" },  
                             EventShape.Point,  
                             StreamEventOrder.FullyOrdered);  
  
// secondary query, composed on top of the first one using the  
// built-in published stream input adapter and the default published  
// stream name of the primary query  
var filterStream = CepStream<MyDataType>.Create("filteredStream",  
                                                typeof(PublishedStreamAdapterFactory),  
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },  
                                                EventShape.Point);  
  
var validated = from e in filterStream  
                ...  
  
var validationQuery = validated.ToQuery(myApp,  
                                        "validationQuery",  
                                        "Validating Query",  
                                        typeof(MyOutputAdapterFactory),  
                                        new OutputAdapterConfig { someString = "foo" },  
                                        EventShape.Point,  
                                        StreamEventOrder.FullyOrdered);  
  

같은 방식으로 쿼리는 CepStream.toPublishedStreamQuery()와 같은 기능을 가진 게시된 스트림 출력 어댑터를 사용할 수 있습니다.

var filterQuery = filtered.ToQuery(myApp,  
                                   "filterQuery",  
                                   "desc",  
                                   typeof(PublishedStreamAdapterFactory),  
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },  
                                   EventShape.Point,  
                                   StreamEventOrder.FullyOrdered);  

쿼리 바인더 개발 모델에서는 다양한 StreamInsight 메타데이터 개체를 완벽하게 제어할 수 있으며 쿼리 템플릿 디자인 단계에서 쿼리 바인딩 및 사용을 명확하게 구분할 수 있습니다. 또한 이 모델에서는 입력 바인딩 측과 출력 바인딩 측 모두에서 동적 쿼리 작성도 가능합니다. 자세한 내용은 쿼리 바인더 사용을 참조하십시오.

다른 쿼리에 입력으로 바인딩

쿼리 바인더는 쿼리 템플릿을 이벤트 생성자로 입력 어댑터에 바인딩할 수 있는 것처럼 기존 쿼리에도 바인딩할 수 있습니다. 첫 번째 예와 같이 기본 쿼리(바인딩된 출력 또는 바인딩 해제된 출력 포함)가 있다고 가정해 보겠습니다.

var query = filtered.ToQuery(myApp, ...);  

이 경우 BindProducer의 적절한 오버로드에서 이전 쿼리를 참조하여 쿼리 바인더를 다음과 같이 사용할 수 있습니다.

var newStream = CepStream<RawData>.Create("validationInput");  
var validated = from e in newStream  
                select new  
                {  
                    SourceId = e.SourceId,  
                    Value = e.Value,  
                    Status = e.Value > 75 ? false : true  
                };  
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);  
QueryBinder queryBinder = new QueryBinder(validateQT);  
queryBinder.BindProducer("validationInput", filterQuery);  
queryBinder.AddConsumer(...);  

쿼리 바인더가 게시된 스트림을 이벤트 생성자로 참조할 수도 있습니다.

queryBinder.BindProducer("validationInput",  
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),  
                         EventShape.Point);  

Query.ToStream() 서명과 마찬가지로 선택적 AdvanceTimeSettings 개체를 BindProducer()에 지정할 수 있습니다.

게시된 스트림에 출력으로 바인딩

출력 측에서 쿼리 바인더는 명시적으로 정의되어 있는 게시된 스트림으로의 스트리밍을 허용합니다.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),  
                                        EventShape.Point,  
                                        StreamEventOrder.FullyOrdered);  

이 쿼리 바인더를 기반으로 하는 쿼리가 시작되는 즉시 다른 쿼리가 이전 예에서 설명한 것처럼 게시된 스트림에 바인딩하여 해당 결과 이벤트를 사용할 수 있습니다.

게시된 스트림 어댑터에 바인딩

게시된 스트림 어댑터는 쿼리 바인더 모델에서도 사용할 수 있습니다. 응용 프로그램 개체에서 이러한 어댑터를 검색해 일반 어댑터처럼 BindProducerAddConsumer에서 사용할 수 있습니다.

queryBinder.BindProducer("validationInput",  
                         myApp.GetPublishedStreamInputAdapter(),  
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },  
                         EventShape.Point);  
queryBinder.AddConsumer("validated",  
                         myApp.GetPublishedStreamOutputAdapter(),  
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },  
                         EventShape.Point,  
                         StreamEventOrder.FullyOrdered);  
  

StreamInsight 어댑터 전체 예
응용 프로그램 시간 이동

표시: