쿼리 바인더 사용

쿼리 바인더는 StreamInsight 응용 프로그램 작성 시 유연성과 재사용 가능성을 최대 수준으로 높일 수 있도록 하는 클라이언트 쪽 개발 모델입니다. 이 모델에서 어댑터 및 쿼리 템플릿은 별도의 메타데이터 개체로 등록되며, 나중에 함께 바인딩하여 쿼리를 인스턴스화할 수 있습니다. 따라서 개발자가 개체 모델 API를 기반으로 하는 명시적 쿼리 바인딩을 사용하여 응용 프로그램 및 개발 환경을 완전하게 제어할 수 있습니다.

일반적인 명시적 서버 개발 모델 사용 사례에 포함되는 StreamInsight 응용 프로그램에는 다음이 필요합니다.

  • StreamInsight 서버에 대한 모든 제어 및 액세스 권한

  • 정적/동적 쿼리 작성을 통한 쿼리 재사용 또는 타사에서 정의한 어댑터, 이벤트 유형 및 쿼리 템플릿 재사용

쿼리 바인더 개발 모델의 주요 특징

쿼리 바인더 모델의 주요 특징은 다음과 같습니다.

  • 개발자가 모든 메타데이터 개체를 명시적으로 만든 다음 StreamInsight 서버에 등록해야 합니다.

  • 이 모델에서는 여러 개체(쿼리 템플릿, 쿼리, 응용 프로그램, 어댑터)를 만들어 사용할 수 있습니다. 모든 개체는 한 응용 프로그램에서 등록해야 합니다.

    쿼리 템플릿 및 쿼리 인스턴스를 서버에 명시적으로 등록해야 쿼리를 실행할 수 있습니다. 입력 어댑터와 출력 어댑터도 명시적으로 등록해야 쿼리 템플릿 또는 쿼리가 이러한 개체를 참조할 수 있습니다. 또한 모든 개체는 한 응용 프로그램에서 등록해야 합니다. 어댑터 및 쿼리 템플릿에서 사용하는 이벤트 유형은 암시적으로 등록됩니다.

다음 예에서는 StreamInsight 서버 개체를 만들고 이 서버에 myApp라는 응용 프로그램 개체를 만듭니다. 그런 다음 이벤트 스트림 가져오기, 처리 및 내보내기에 필요한 모든 StreamInsight 개체를 만들어 등록합니다.

먼저 서버 및 응용 프로그램 개체를 만듭니다.

server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");

다음으로 입력 및 출력 어댑터를 응용 프로그램에 등록합니다.

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

쿼리 템플릿은 바인딩 해제된 스트림을 기반으로 지정됩니다. 바인딩 해제된 스트림을 만드는 데 필요한 매개 변수는 스트림 이름뿐입니다. 이 이름은 나중에 어댑터 바인딩 시에 필요합니다.

var inputstream = CepStream<MyDataType>.Create("filterInput");

var filtered = from e in inputstream
               where e.Value > 95
               select e;

QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
  • 마지막 호출에서는 응용 프로그램에 쿼리 템플릿이 등록됩니다. 등록된 쿼리 템플릿은 여러 바인딩에서 재사용할 수 있으므로, 각각 다른 입력 및 출력 어댑터에 바인딩될 수 있는 여러 쿼리에서 인스턴스화됩니다. 등록된 쿼리 템플릿에 대한 이러한 바인딩은 QueryBinder 개체를 통해 정의됩니다.
QueryBinder queryBinder = new QueryBinder(filterQT);

queryBinder.BindProducer<MyDataType>("filterInput",
                                      inputAdapter,
                                      new InputAdapterConfig { someFlag = true },
                                      EventShape.Point);

queryBinder.AddConsumer("filterOutput",
                         outputAdapter,
                         new OutputAdapterConfig { someString = "foo" },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

BindProducer() 메서드는 입력 어댑터 개체(응용 프로그램에서 등록해야 함)를 지정한 이름(여기서는 "filterInput")의 스트림에 바인딩합니다. 그러면 쿼리 템플릿의 여러 진입점을 구분할 수 있습니다. 입력 어댑터와 함께 바인딩 관련 매개 변수(어댑터 구성 및 원하는 이벤트 셰이프)도 필요합니다.

AddConsumer() 메서드는 출력 어댑터 개체(응용 프로그램에서 등록해야 함)를 쿼리 템플릿의 단일 나가는 스트림에 바인딩합니다. 제공된 출력 스트림 이름(여기서는 "validated")을 사용하여 진단용으로 스트림을 식별할 수 있습니다. 입력 어댑터와 마찬가지로 출력 어댑터에 대해서도 바인딩 관련 매개 변수가 제공됩니다.

쿼리 개체는 쿼리 바인더, 쿼리 식별자 및 설명 텍스트를 기반으로 작성됩니다. 마지막 단계에서는 쿼리를 시작합니다.

query.Start();

입력 스트림이 여러 개인 쿼리

다음 예에서는 여러 입력 스트림을 사용하는 쿼리 템플릿을 만드는 방법을 보여 줍니다. 쿼리 템플릿에는 여러 진입점이 있을 수 있으며, 각 진입점에는 두 스트림을 조인해야 하는 등의 경우와 같이 서로 다른 데이터 원본의 데이터가 공급될 수 있습니다. 스트림을 올바르게 연결하려면 다음 예와 같이 스트림 이름을 지정해야 합니다.

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...

InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

기존 응용 프로그램 수정

쿼리 바인더 모델에서 사용하는 쿼리 템플릿과 어댑터 개체를 같은 응용 프로그램에서 만들 필요는 없습니다. 다음 예에서는 기존 서버에 연결되어 있다고 가정하고 엔터티를 새로 만드는 대신 StreamInsight 개체 모델 API를 통해 기존 메타데이터 엔터티를 검색합니다.

Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];

지속형 메타데이터 저장소 사용

StreamInsight 서버를 만들 때는 사용할 메타데이터 저장소 유형을 Server.Create() 메서드에 대해 선택적 매개 변수로 사용할 수 있습니다. 기본적으로 메타데이터는 메모리에 저장됩니다. 필요한 경우에는 SQL Server Compact 3.5 데이터베이스를 통해 디스크에 메타데이터를 보관할 수도 있습니다. 다음 예에서는 SQL Server Compact 3.5 데이터베이스를 메타데이터 저장소로 지정하는 방법을 보여 줍니다.

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;

server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");

서버를 만들 때 기존 메타데이터 데이터베이스를 지정하면 지정된 파일에서 모든 메타데이터를 읽습니다. 그러면 StreamInsight 개체 모델 API를 통해 메타데이터 엔터티를 검색할 수 있습니다.

전체 예

using (Server server = Server.Create("MyInstance"))
{
try
{
    Application myApp = server.CreateApplication("MyApp");
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

    var inputstream = CepStream<MyDataType>.Create("filterInput");

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
    QueryBinder queryBinder = new QueryBinder(filterQT);

    queryBinder.BindProducer<MyDataType>("filterInput",
                                         inputAdapter,
                                         new InputAdapterConfig { someFlag = true },
                                         EventShape.Point);

    queryBinder.AddConsumer("filterOutput",
                                                 outputAdapter,
                                                 new OutputAdapterConfig { someString = "foo" },
                                                 EventShape.Point,
                                                 StreamEventOrder.FullyOrdered);

    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

    query.Start();
    Console.ReadLine();
    query.Stop();
}
catch (Exception e)
{
    Console.WriteLine(e.ToString());
}
}

참고 항목

개념

StreamInsight 서버 개념