다음을 통해 공유


열거 가능 이벤트 원본 및 이벤트 싱크의 전체 예(StreamInsight)

이 간단한 엔드 투 엔드에서는 IEnumerable 인터페이스를 구현하여 완전한 StreamInsight 응용 프로그램을 만드는 이벤트 원본 및 이벤트 싱크의 사용을 보여 줍니다.

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

이 예에서는 SQL Server 및 ADO.NET Entity Framework와 함께 StreamInsight를 사용하여 Northwind 예제 데이터베이스의 기록 데이터에 대한 시간 관련 쿼리에 응답합니다. 쿼리는 지역 내에서 4개 이상의 주문이 활성 상태였던 시간 간격을 찾습니다.

이 예에서는 IEnumerable 이벤트 원본을 사용합니다. IObservable을 구현하는 이벤트 원본을 사용하기 위한 단계는 비슷합니다. 그러나 Observable 출력은 데이터를 Observer에 밀어 넣습니다. 소비자가 데이터를 끌어올 필요가 없는데 이는 열거 가능 원본에 대해 foreach를 호출할 때 이 작업이 수행되기 때문입니다.

1단계 - Observable 또는 열거 가능 이벤트 원본 제공

먼저 Northwind 데이터베이스에 대한 LINQ to Entities 쿼리를 실행하여 쿼리에 대한 원본 데이터를 정의합니다. 기본적으로 결과(databaseQuery)는 IEnumerable 인터페이스를 구현합니다.

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

2단계 - 입력을 스트림으로 변환하고 스트림의 임시 특성 설명

다음으로 쿼리의 결과를 간격 이벤트의 스트림으로 변환합니다.

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

이 코드에서는 AdvanceTimeSettings 클래스에서 도우미 IncreasingStartTime를 사용하여 delay가 0인 각 이벤트 뒤에 CTI 이벤트를 삽입합니다. 또는 StrictlyIncreasingStartTime을 사용하여 -1 틱의 delay를 지정하거나(따라서 이벤트의 시작 시간 뒤에 CTI 1 틱이 놓임) UnorderedTimestamps를 사용하여 delay에 대한 사용자 지정 값을 지정할 수 있습니다.

그런 다음 IntervalEvent 클래스의 CreateInsert 메서드는 OrderDate를 시작 시간으로, Shipped Date를 간격의 종료 시간으로, ShipRegion을 이벤트의 페이로드로 제공하여 원본 데이터를 이벤트 스트림으로 변환합니다.

EdgeEvent 클래스에 CreateStart 및 CreateEnd 메서드가 있지만 PointEvent 클래스는 비슷한 CreateInsert 메서드를 제공합니다. 이러한 세 개의 이벤트 클래스 모두는 AdvanceTimeSettings를 선언적으로 사용하는 대신에 CTI 이벤트를 절차적으로 삽입하기 위한 CreateCti 메서드를 제공합니다.

3단계 - 쿼리 작성

다음으로 들어오는 이벤트 스트림에 적합한 시간 인식 StreamInsight 쿼리를 작성합니다.

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

이 쿼리는 이벤트를 지역별 각 시간 간격으로 그룹화한 다음 4개 이상의 이벤트가 활성 상태였던 간격만 선택합니다. 이 쿼리는 활성 주문 수와 ShipRegion에 대한 식별자를 포함하는 완전히 다른 페이로드가 있는 새 스트림에 결과를 프로젝션합니다.

쿼리 템플릿 작성 방법은 다음을 참조하십시오.

4단계 - 쿼리 출력을 Observable 또는 열거 가능 이벤트 싱크로 변환

다음으로 쿼리의 출력 스트림을 열거 가능 결과로 변환합니다.

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

이 쿼리는 CTI 이벤트를 필터링하고 삽입 이벤트만 간격 이벤트의 열거 가능 스트림에 프로젝션합니다. 4개의 필드가 있는 새 익명 형식에 이벤트의 페이로드가 포함됩니다.

ToIntervalEnumerable 메서드 외에 관련 확장 메서드에는 다음이 포함됩니다.

  • ToPointEnumerable 및 ToEdgeEnumerable

  • ToPointObservable, ToIntervalObservable 및 ToEdgeObservable

이러한 메서드는 관리 및 디버깅 목적을 위해 쿼리를 식별하는 데 사용되는 쿼리 이름과 쿼리 설명을 제공하여 기본 IEnumerable 및 IObservable 인터페이스를 확장하는 ICepEnumerable 또는 ICepObservable 인터페이스를 반환합니다.

또한 ICepEnumerable 또는 ICepObservable 인터페이스는 선택(Where) 또는 프로젝션(Select)을 통해 출력 이벤트를 필터링하는 도우미 메서드를 제공합니다. 예를 들면 다음과 같습니다.

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

5단계 - 출력 사용

마지막으로 쿼리의 결과를 사용합니다. 일반적인 LINQ 공급자의 지연된 평가 모델로 인해 소비자가 결과 열거 또는 관찰을 시작할 때까지 쿼리는 평가되지 않습니다.

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}