StreamInsight 예제: 클라이언트 B - 주체 만들기
이 예에서는 서버에서 정의된 원격 서버와 엔터티를 사용하는 StreamInsight 클라이언트를 만드는 방법을 보여 줍니다. 이 예에서는 특히 여러 원본 및 싱크에 바인딩되는 주체를 만드는 방법을 보여 줍니다. 주체는 데이터가 두 원본 스트림에서 도착하고 두 싱크로 배포될 때 데이터를 받습니다. StreamInsight 엔터티에 대한 자세한 내용은 StreamInsight 개념을 참조하십시오.
이 예에서는 이 섹션의 서버 예제에서 만든 원격 서버와 엔터티를 사용합니다. 이 섹션의 예와 함께 사용하려면 다음을 수행합니다.
서버 예제를 실행합니다.StreamInsight 예제: 서버 - 포함된 서버 표시
다음 클라이언트 예제 중 하나 또는 둘 다를 실행합니다.
단계별 지침
일반적으로 StreamInsight 클라이언트는 다음과 같은 기본 단계를 따릅니다.
StreamInsight 서버 인스턴스 만들기
StreamInsight 응용 프로그램 만들기 또는 가져오기
원본 정의 또는 가져오기
원본에 대한 쿼리 작성
싱크 정의 또는 가져오기
쿼리 및 싱크 바인딩 및 실행
이 예에서 클라이언트는 서버에서 기존 응용 프로그램과 엔터티를 가져오지만 프로그램도 추가 원본, 싱크 및 주체를 만듭니다.
서버에 연결
StreamInsight 클라이언트 프로그램을 만드는 과정에서 가장 먼저 수행하는 작업은 StreamInsight 서버 인스턴스 인스턴스화입니다. 이 예에서 클라이언트는 “MyStreamInsightServer”라는 원격 서버에 연결합니다.
var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer"))
StreamInsight 서버 연결에 사용할 수 있는 옵션에 대한 자세한 내용은 StreamInsight Server 서버에 게시 및 연결을 참조하십시오.
서버 응용 프로그램 가져오기
이 예에서 클라이언트는 원격 서버에서 생성된 StreamInsight 응용 프로그램을 사용합니다. 이 클라이언트가 사용할 모든 서버가 이 응용 프로그램에 정의되었으며 생성된 새 엔터티는 동일한 응용 프로그램에서 만들어집니다.
myApp = server.Applications["serverApp"];
서버 원본 가져오기 및 새 원본 정의
서버에 정의된 원본을 가져오고 새 원본을 정의합니다. 이 예에서 두 번째 원본의 데이터는 1초마다 생성되는 시점 이벤트의 간단한 임시 스트림입니다.
var mySource = myApp.GetObservable<int>("serverSource");
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
원본에 대한 쿼리 작성
두 원본에 대한 쿼리를 작성합니다. 이 예에서 첫 번째 쿼리는 서버 원본에서 모든 짝수 데이터 값을 검색하고 값 + 2000을 반환하며 두 번째 쿼리는 두 번째 원본에서 모든 홀수 데이터 값을 검색하고 값 + 3000을 반환합니다.
var myQuery = from e in mySource
where e % 2 == 0
select e + 2000;
var myQueryB = from e in mySourceB
where e % 2 == 1
select e + 3000;
서버 싱크 가져오기 및 새 싱크 정의
서버에 정의된 싱크를 가져오고 새 싱크를 정의합니다. 이 예에서 두 번째 싱크는 콘솔에 스트림 값을 작성하는 간단한 함수입니다.
var mySink = myApp.GetObserver<int>("serverSink");
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));
주체 만들기
주체를 만듭니다. 주체는 원본에서 데이터를 사용하고 싱크에 데이터를 전달하는 원본과 싱크에 모두 바인딩할 수 있는 서버의 개체입니다.
var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());
쿼리 및 싱크 바인딩 및 실행
이제 주체를 이전에 서버에 정의된 싱크와 이 클라이언트에 정의된 싱크에 각각 바인딩합니다. 각각의 바인딩은 별도의 프로세스로 실행됩니다. 주체를 싱크에 바인딩할 때 원본이 바인딩되지 않기 때문에 데이터 흐름이 없습니다. 싱크를 바인딩하기 전에 원본을 주체에 바인딩한 경우 원본으로 데이터 흐름이 즉시 시작되고 데이터가 손실됩니다.
var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");
주체를 쿼리에 바인딩합니다. 이러한 프로세스를 실행하면 각 싱크에 대한 쿼리를 통해 원본에서 데이터 흐름이 시작됩니다
var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");
이제 이 클라이언트는 서버에 여러 엔터티를 정의했습니다.
serverSubject_Client_B
serverProcess_Client_B_1
serverProcess_Client_B_2
serverProcess_Client_B_3
serverProcess_Client_B_4
전체 예
다음 예에서는 앞서 설명한 구성 요소를 조합해 완전한 응용 프로그램을 작성합니다. 간단히 하기 위해 이 예에서는 오류를 일으킬 수 있는 조건을 검사하지 않지만 예제 StreamInsight 예제: 서버 - 포함된 서버 표시의 서버가 실행 중이고 예상 엔터티가 생성된 것으로 가정합니다.
using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace StreamInsight21_example_Client_B
/* This example:
* connects to a remote server
* gets the app and source defined in the server
* defines a second source
* creates simple queries over the 2 sources
* gets the sink defined in the server
* defines a second sink
* binds and runs the subject to both sinks
* binds and runs the subject to both queries
* waits for the user to stop the program
*/
{
class Program
{
static void Main(string[] args)
{
// Connect to the StreamInsight server
using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer")))
{
/* The following entities are expected to be defined in the server:
* serverApp
* serverSource
* serverSink
*/
/* The following entities will be defined in the server by this client:
* serverSubject_Client_B
* serverProcess_Client_B_1
* serverProcess_Client_B_2
* serverProcess_Client_B_3
* serverProcess_Client_B_4
*/
// Get the existing StreamInsight APPLICATION
var myApp = server.Applications["serverApp"];
// GET the SOURCE from the server
var mySource = myApp.GetStreamable<long>("serverSource");
// DEFINE a second SOURCE (returns a point event every second)
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
// COMPOSE a QUERY on the server source (return every even-numbered item + 2000)
var myQuery = from e in mySource
where e % 2 == 0
select e + 2000;
// COMPOSE a QUERY on the second source (return every odd-numbered item + 3000)
var myQueryB = from e in mySourceB
where e % 2 == 1
select e + 3000;
// GET the SINK from the server
var mySink = myApp.GetObserver<long>("serverSink");
// DEFINE a second SINK
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));
// CREATE a SUBJECT
var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());
// BIND the SINKS to the SUBJECT
var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");
// BIND the SOURCES to the SUBJECT
var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");
// Wait for the user to stop the program
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine("Client B is running, press Enter to exit the client");
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine(" ");
Console.ReadLine();
// Remove the entities we created
myApp.Entities["serverSubject_Client_B"].Delete();
procB1.Dispose();
procB2.Dispose();
procB3.Dispose();
procB4.Dispose();
}
}
}
}