StreamInsight 예제: 서버 - 포함된 서버 표시
이 예에서는 포함된 StreamInsight 서버를 만들고 원격 서버로 사용하기 위해 클라이언트 프로그램용으로 표시하는 방법을 보여 줍니다. 서버를 만들고 사용할 수 있게 하는 작업 외에도 이 예에서는 클라이언트 자체로 작동하여 원본 및 쿼리를 만들고 싱크를 바인딩하고 바인딩을 프로세스로 실행합니다. StreamInsight 엔터티에 대한 자세한 내용은 StreamInsight 개념을 참조하십시오.
이 예에서 만든 엔터티는 이 섹션의 다른 예에서 사용되도록 설계되었습니다. 이 섹션의 예와 함께 사용하려면 다음을 수행합니다.
이 서버 예제를 실행합니다.
다음 클라이언트 예제 중 하나 또는 둘 다를 실행합니다.
단계별 지침
일반적으로 StreamInsight 클라이언트는 다음과 같은 기본 단계를 따릅니다.
StreamInsight 서버 인스턴스 만들기
StreamInsight 응용 프로그램 만들기 또는 가져오기
원본 정의 또는 가져오기
원본에 대한 쿼리 작성
싱크 정의 또는 가져오기
쿼리 및 싱크 바인딩 및 실행
이 예제에서는 다른 클라이언트가 사용하여 서버에 배포할 수 있도록 프로그램에서 모든 엔터티를 만듭니다.
서버 인스턴스 만들기
StreamInsight 프로그램을 만드는 과정에서 가장 먼저 수행하는 작업은 StreamInsight 서버 인스턴스 인스턴스화입니다. 이 예에서 서버는 프로그램 내에 포함됩니다.
server = Server.Create("Default");
StreamInsight 설치 프로세스를 통해 컴퓨터에 등록된 인스턴스 이름(위의 예에서는 Default
)을 사용하여 서버를 만들어야 합니다. 자세한 내용은 설치(StreamInsight)를 참조하십시오.
그런 다음 클라이언트 StreamInsight 프로그램을 연결하고 이를 원격 StreamInsight 서버로 사용할 수 있도록 원격 서버의 끝점을 표시합니다.
var host = new ServiceHost(server.CreateManagementService());
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");
클라이언트 프로그램은 StreamInsight 설치 시 생성된 StreamInsight Host Windows 서비스를 사용할 수도 있습니다. StreamInsight 서버 연결에 사용할 수 있는 옵션에 대한 자세한 내용은 StreamInsight Server 서버에 게시 및 연결을 참조하십시오.
웹 응용 프로그램 만들기
응용 프로그램은 서버의 범위 지정 단위를 나타냅니다. 기타 모든 엔터티는 응용 프로그램에서 생성됩니다.
var myApp = server.CreateApplication("serverApp");
원본 정의 및 배포
그런 다음 다른 StreamInsight 클라이언트에서 사용할 수 있도록 입력 원본을 정의하고 이름을 지정하여 서버에 배포합니다. 이 예에서 데이터는 1초마다 생성되는 시점 이벤트의 간단한 임시 스트림입니다.
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
mySource.Deploy("serverSource");
원본에 대한 쿼리 작성
입력 원본에 대한 쿼리를 작성합니다. 쿼리는 LINQ를 쿼리 사양 언어로 사용합니다. 이 예에서 쿼리는 모든 짝수 이벤트 값을 반환합니다.
var myQuery = from e in mySource
where e % 2 == 0
select e;
기술적인 측면에서 볼 때 이 정의는 필터 조건자(where e % 2 == 0
)를 충족하지 않는 모든 이벤트를 시퀀스에서 삭제하는 필터 연산자로 변환되고 이벤트 값을 반환합니다. LINQ 쿼리 연산자에 대한 자세한 내용은 StreamInsight LINQ 사용을 참조하십시오.
싱크 정의 및 배포
쿼리에 바인딩할 수 있는 출력 싱크가 생성되고 결과 시퀀스를 처리합니다. 이 예에서는 콘솔에 스트림 값을 작성하는 간단한 함수가 생성됩니다.
var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));
그런 다음 이름이 있는 싱크가 서버에 배포됩니다.
mySink.Deploy("serverSink");
쿼리 및 싱크 바인딩 및 실행
이때 Observable 쿼리를 Observer 출력 싱크에 바인딩한 다음 서버의 프로세스에서 실행할 수 있습니다.
var proc = myQuery.Bind(mySink).Run("serverProcess");
아래의 전체 예에서 사용자가 콘솔에서 입력하여 중지할 때까지 이 프로세스가 계속 실행됩니다.
전체 예
다음 예에서는 앞서 설명한 구성 요소를 조합해 완전한 응용 프로그램을 작성합니다. 간단히 하기 위해 이 예에서는 오류를 일으킬 수 있는 조건을 검사하지 않습니다.
using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
namespace StreamInsight21_example_Server
/* This example:
* creates an embedded server instance and makes it available to other clients
* defines, deploys, binds, and runs a simple source, query, and sink
* waits for the user to stop the server
*/
{
class Program
{
static void Main(string[] args)
{
// Create an embedded StreamInsight server
using (var server = Server.Create("Default"))
{
// Create a local end point for the server embedded in this program
var host = new ServiceHost(server.CreateManagementService());
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");
host.Open();
/* The following entities will be defined and available in the server for other clients:
* serverApp
* serverSource
* serverSink
* serverProcess
*/
// CREATE a StreamInsight APPLICATION in the server
var myApp = server.CreateApplication("serverApp");
// DEFINE a simple SOURCE (returns a point event every second)
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
// DEPLOY the source to the server for clients to use
mySource.Deploy("serverSource");
// Compose a QUERY over the source (return every even-numbered event)
var myQuery = from e in mySource
where e % 2 == 0
select e;
// DEFINE a simple observer SINK (writes the value to the server console)
var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));
// DEPLOY the sink to the server for clients to use
mySink.Deploy("serverSink");
// BIND the query to the sink and RUN it
using (var proc = myQuery.Bind(mySink).Run("serverProcess"))
{
// Wait for the user stops the server
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine(" ");
Console.ReadLine();
}
host.Close();
}
}
}
}