StreamInsight 예제: 클라이언트 B - 주체 만들기

이 예에서는 서버에서 정의된 원격 서버와 엔터티를 사용하는 StreamInsight 클라이언트를 만드는 방법을 보여 줍니다. 이 예에서는 특히 여러 원본 및 싱크에 바인딩되는 주체를 만드는 방법을 보여 줍니다. 주체는 데이터가 두 원본 스트림에서 도착하고 두 싱크로 배포될 때 데이터를 받습니다. StreamInsight 엔터티에 대한 자세한 내용은 StreamInsight 개념을 참조하십시오.

이 예에서는 이 섹션의 서버 예제에서 만든 원격 서버와 엔터티를 사용합니다. 이 섹션의 예와 함께 사용하려면 다음을 수행합니다.

  1. 서버 예제를 실행합니다.StreamInsight 예제: 서버 - 포함된 서버 표시

  2. 다음 클라이언트 예제 중 하나 또는 둘 다를 실행합니다.

단계별 지침

일반적으로 StreamInsight 클라이언트는 다음과 같은 기본 단계를 따릅니다.

  • StreamInsight 서버 인스턴스 만들기

  • StreamInsight 응용 프로그램 만들기 또는 가져오기

  • 원본 정의 또는 가져오기

  • 원본에 대한 쿼리 작성

  • 싱크 정의 또는 가져오기

  • 쿼리 및 싱크 바인딩 및 실행

이 예에서 클라이언트는 서버에서 기존 응용 프로그램과 엔터티를 가져오지만 프로그램도 추가 원본, 싱크 및 주체를 만듭니다.

서버에 연결

StreamInsight 클라이언트 프로그램을 만드는 과정에서 가장 먼저 수행하는 작업은 StreamInsight 서버 인스턴스 인스턴스화입니다. 이 예에서 클라이언트는 “MyStreamInsightServer”라는 원격 서버에 연결합니다.

var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer"))

StreamInsight 서버 연결에 사용할 수 있는 옵션에 대한 자세한 내용은 StreamInsight Server 서버에 게시 및 연결을 참조하십시오.

서버 응용 프로그램 가져오기

이 예에서 클라이언트는 원격 서버에서 생성된 StreamInsight 응용 프로그램을 사용합니다. 이 클라이언트가 사용할 모든 서버가 이 응용 프로그램에 정의되었으며 생성된 새 엔터티는 동일한 응용 프로그램에서 만들어집니다.

myApp = server.Applications["serverApp"];

서버 원본 가져오기 및 새 원본 정의

서버에 정의된 원본을 가져오고 새 원본을 정의합니다. 이 예에서 두 번째 원본의 데이터는 1초마다 생성되는 시점 이벤트의 간단한 임시 스트림입니다.

var mySource = myApp.GetObservable<int>("serverSource");
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

원본에 대한 쿼리 작성

두 원본에 대한 쿼리를 작성합니다. 이 예에서 첫 번째 쿼리는 서버 원본에서 모든 짝수 데이터 값을 검색하고 값 + 2000을 반환하며 두 번째 쿼리는 두 번째 원본에서 모든 홀수 데이터 값을 검색하고 값 + 3000을 반환합니다.

var myQuery = from e in mySource
              where e % 2 == 0
              select e + 2000;
var myQueryB = from e in mySourceB
               where e % 2 == 1
               select e + 3000;

서버 싱크 가져오기 및 새 싱크 정의

서버에 정의된 싱크를 가져오고 새 싱크를 정의합니다. 이 예에서 두 번째 싱크는 콘솔에 스트림 값을 작성하는 간단한 함수입니다.

var mySink = myApp.GetObserver<int>("serverSink");
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

주체 만들기

주체를 만듭니다. 주체는 원본에서 데이터를 사용하고 싱크에 데이터를 전달하는 원본과 싱크에 모두 바인딩할 수 있는 서버의 개체입니다.

var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

쿼리 및 싱크 바인딩 및 실행

이제 주체를 이전에 서버에 정의된 싱크와 이 클라이언트에 정의된 싱크에 각각 바인딩합니다. 각각의 바인딩은 별도의 프로세스로 실행됩니다. 주체를 싱크에 바인딩할 때 원본이 바인딩되지 않기 때문에 데이터 흐름이 없습니다. 싱크를 바인딩하기 전에 원본을 주체에 바인딩한 경우 원본으로 데이터 흐름이 즉시 시작되고 데이터가 손실됩니다.

var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");

주체를 쿼리에 바인딩합니다. 이러한 프로세스를 실행하면 각 싱크에 대한 쿼리를 통해 원본에서 데이터 흐름이 시작됩니다

var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");

이제 이 클라이언트는 서버에 여러 엔터티를 정의했습니다.

  • serverSubject_Client_B

  • serverProcess_Client_B_1

  • serverProcess_Client_B_2

  • serverProcess_Client_B_3

  • serverProcess_Client_B_4

전체 예

다음 예에서는 앞서 설명한 구성 요소를 조합해 완전한 응용 프로그램을 작성합니다. 간단히 하기 위해 이 예에서는 오류를 일으킬 수 있는 조건을 검사하지 않지만 예제 StreamInsight 예제: 서버 - 포함된 서버 표시의 서버가 실행 중이고 예상 엔터티가 생성된 것으로 가정합니다.

using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace StreamInsight21_example_Client_B
    /* This example:
     * connects to a remote server
     * gets the app and source defined in the server
     * defines a second source
     * creates simple queries over the 2 sources
     * gets the sink defined in the server
     * defines a second sink
     * binds and runs the subject to both sinks
     * binds and runs the subject to both queries
     * waits for the user to stop the program
     */
{
    class Program
    {
        static void Main(string[] args)
        {
            // Connect to the StreamInsight server
            using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer")))
            {
                /* The following entities are expected to be defined in the server:
                 * serverApp
                 * serverSource
                 * serverSink
                 */
                /* The following entities will be defined in the server by this client:
                 * serverSubject_Client_B
                 * serverProcess_Client_B_1
                 * serverProcess_Client_B_2
                 * serverProcess_Client_B_3
                 * serverProcess_Client_B_4
                 */

                // Get the existing StreamInsight APPLICATION
                var myApp = server.Applications["serverApp"];
                   
                // GET the SOURCE from the server
                var mySource = myApp.GetStreamable<long>("serverSource");

                // DEFINE a second SOURCE (returns a point event every second)
                var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

                // COMPOSE a QUERY on the server source (return every even-numbered item + 2000)
                var myQuery = from e in mySource
                              where e % 2 == 0
                              select e + 2000;

                // COMPOSE a QUERY on the second source (return every odd-numbered item + 3000)
                var myQueryB = from e in mySourceB
                               where e % 2 == 1
                               select e + 3000;
                    
                // GET the SINK from the server
                var mySink = myApp.GetObserver<long>("serverSink");

                // DEFINE a second SINK
                var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

                // CREATE a SUBJECT
                var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

                // BIND the SINKS to the SUBJECT
                var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
                var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");
                
                // BIND the SOURCES to the SUBJECT
                var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
                var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");
                
                // Wait for the user to stop the program
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine("Client B is running, press Enter to exit the client");
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine(" ");
                Console.ReadLine();

                // Remove the entities we created
                myApp.Entities["serverSubject_Client_B"].Delete();
                procB1.Dispose();
                procB2.Dispose();
                procB3.Dispose();
                procB4.Dispose();
            }
        }
    }
}

참고 항목

관련 자료

StreamInsight 예제: 서버 - 포함된 서버 표시

StreamInsight 예제

StreamInsight 개념