StreamInsight 예제: 서버 - 포함된 서버 표시

 

이 예에서는 포함된 StreamInsight 서버를 만들고 원격 서버로 사용하기 위해 클라이언트 프로그램용으로 표시하는 방법을 보여 줍니다. 서버를 만들고 사용할 수 있게 하는 작업 외에도 이 예에서는 클라이언트 자체로 작동하여 원본 및 쿼리를 만들고 싱크를 바인딩하고 바인딩을 프로세스로 실행합니다. StreamInsight 엔터티에 대한 자세한 내용은 StreamInsight 개념을 참조하십시오.

이 예에서 만든 엔터티는 이 섹션의 다른 예에서 사용되도록 설계되었습니다. 이 섹션의 예와 함께 사용하려면 다음을 수행합니다.

  1. 이 서버 예제를 실행합니다.

  2. 다음 클라이언트 예제 중 하나 또는 둘 다를 실행합니다.

단계별 지침

일반적으로 StreamInsight 클라이언트는 다음과 같은 기본 단계를 따릅니다.

  • StreamInsight 서버 인스턴스 만들기

  • StreamInsight 응용 프로그램 만들기 또는 가져오기

  • 원본 정의 또는 가져오기

  • 원본에 대한 쿼리 작성

  • 싱크 정의 또는 가져오기

  • 쿼리 및 싱크 바인딩 및 실행

이 예제에서는 다른 클라이언트가 사용하여 서버에 배포할 수 있도록 프로그램에서 모든 엔터티를 만듭니다.

서버 인스턴스 만들기

StreamInsight 프로그램을 만드는 과정에서 가장 먼저 수행하는 작업은 StreamInsight 서버 인스턴스 인스턴스화입니다. 이 예에서 서버는 프로그램 내에 포함됩니다.

server = Server.Create("Default");  

StreamInsight 설치 프로세스를 통해 컴퓨터에 등록된 인스턴스 이름(위의 예에서는 Default)을 사용하여 서버를 만들어야 합니다. 자세한 내용은 설치(StreamInsight)를 참조하십시오.

그런 다음 클라이언트 StreamInsight 프로그램을 연결하고 이를 원격 StreamInsight 서버로 사용할 수 있도록 원격 서버의 끝점을 표시합니다.

  
var host = new ServiceHost(server.CreateManagementService());  
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
  

클라이언트 프로그램은 StreamInsight 설치 시 생성된 StreamInsight Host Windows 서비스를 사용할 수도 있습니다. StreamInsight 서버 연결에 사용할 수 있는 옵션에 대한 자세한 내용은 StreamInsight Server 서버에 게시 및 연결을 참조하십시오.

웹 응용 프로그램 만들기

응용 프로그램은 서버의 범위 지정 단위를 나타냅니다. 기타 모든 엔터티는 응용 프로그램에서 생성됩니다.

var myApp = server.CreateApplication("serverApp");  

원본 정의 및 배포

그런 다음 다른 StreamInsight 클라이언트에서 사용할 수 있도록 입력 원본을 정의하고 이름을 지정하여 서버에 배포합니다. 이 예에서 데이터는 1초마다 생성되는 시점 이벤트의 간단한 임시 스트림입니다.

  
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
mySource.Deploy("serverSource");  
  

원본에 대한 쿼리 작성

입력 원본에 대한 쿼리를 작성합니다. 쿼리는 LINQ를 쿼리 사양 언어로 사용합니다. 이 예에서 쿼리는 모든 짝수 이벤트 값을 반환합니다.

  
var myQuery = from e in mySource  
              where e % 2 == 0  
              select e;  
  

기술적인 측면에서 볼 때 이 정의는 필터 조건자(where e % 2 == 0)를 충족하지 않는 모든 이벤트를 시퀀스에서 삭제하는 필터 연산자로 변환되고 이벤트 값을 반환합니다. LINQ 쿼리 연산자에 대한 자세한 내용은 StreamInsight LINQ 사용을 참조하십시오.

싱크 정의 및 배포

쿼리에 바인딩할 수 있는 출력 싱크가 생성되고 결과 시퀀스를 처리합니다. 이 예에서는 콘솔에 스트림 값을 작성하는 간단한 함수가 생성됩니다.

var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  

그런 다음 이름이 있는 싱크가 서버에 배포됩니다.

mySink.Deploy("serverSink");  

쿼리 및 싱크 바인딩 및 실행

이때 Observable 쿼리를 Observer 출력 싱크에 바인딩한 다음 서버의 프로세스에서 실행할 수 있습니다.

var proc = myQuery.Bind(mySink).Run("serverProcess");  

아래의 전체 예에서 사용자가 콘솔에서 입력하여 중지할 때까지 이 프로세스가 계속 실행됩니다.

전체 예

다음 예에서는 앞서 설명한 구성 요소를 조합해 완전한 응용 프로그램을 작성합니다. 간단히 하기 위해 이 예에서는 오류를 일으킬 수 있는 조건을 검사하지 않습니다.

  
using System;  
using System.ServiceModel;  
using Microsoft.ComplexEventProcessing;  
using Microsoft.ComplexEventProcessing.Linq;  
using Microsoft.ComplexEventProcessing.ManagementService;  
using System.Reactive;  
using System.Reactive.Linq;  
  
namespace StreamInsight21_example_Server  
  
    /* This example:  
     * creates an embedded server instance and makes it available to other clients  
     * defines, deploys, binds, and runs a simple source, query, and sink  
     * waits for the user to stop the server  
     */  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            // Create an embedded StreamInsight server  
            using (var server = Server.Create("Default"))  
            {  
                // Create a local end point for the server embedded in this program  
                var host = new ServiceHost(server.CreateManagementService());  
                host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
                host.Open();  
  
                /* The following entities will be defined and available in the server for other clients:  
                 * serverApp  
                 * serverSource  
                 * serverSink  
                 * serverProcess  
                 */  
  
                // CREATE a StreamInsight APPLICATION in the server  
                var myApp = server.CreateApplication("serverApp");  
  
                // DEFINE a simple SOURCE (returns a point event every second)  
                var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
  
                // DEPLOY the source to the server for clients to use  
                mySource.Deploy("serverSource");  
  
                // Compose a QUERY over the source (return every even-numbered event)  
                var myQuery = from e in mySource  
                              where e % 2 == 0  
                              select e;  
  
                // DEFINE a simple observer SINK (writes the value to the server console)  
                var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  
  
                // DEPLOY the sink to the server for clients to use  
                mySink.Deploy("serverSink");  
  
                // BIND the query to the sink and RUN it  
                using (var proc = myQuery.Bind(mySink).Run("serverProcess"))  
                {  
                    // Wait for the user stops the server  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine(" ");  
                    Console.ReadLine();  
                }  
                host.Close();  
            }  
        }  
    }  
}  
  

참고 항목

StreamInsight 예제
StreamInsight 개념