복구 가능한 StreamInsight 응용 프로그램 빌드

이 항목에서는 복구 가능한 StreamInsight 응용 프로그램을 만드는 단계에 대해 설명합니다.

복구는 StreamInsight Premium 버전에서만 사용할 수 있습니다. 자세한 내용은 StreamInsight 버전 선택을 참조하십시오.

재생 및 중복 제거를 비롯하여 복구 가능한 응용 프로그램의 전체 코드 샘플은 Codeplex의 StreamInsight 샘플 페이지에서 검사점 설정 샘플을 참조하십시오.

항목 내용

  1. 1단계. 복구 가능한 서버 구성

  2. 2단계. 복구 가능한 쿼리 정의

  3. 3단계. 검사점 캡처

  4. 4단계. 입력 어댑터에서 이벤트 재생

  5. 5단계. 출력 어댑터에서 중복 항목 제거

  6. 6단계. 오류 복구

  7. 복구를 비활성화하지 않고 종료

1단계. 복구 가능한 서버 구성

필요한 설정

복구 가능한 서버를 구성하려면 서버를 만들 때 다음 구성 설정의 값을 제공합니다.

  • 메타데이터 저장소. 서버의 메타데이터를 저장하려면 SQL Server Compact를 사용해야 합니다. 메타데이터는 메모리에 저장할 수 없습니다.

  • 로그 경로. 이 설정은 복구 가능한 쿼리에 대해 검사점 데이터가 저장되는 위치를 결정합니다. 기본 경로 값은 StreamInsight 프로세스의 작업 디렉터리입니다. 관련된 CreateLogPathIfMissing 설정은 지정한 디렉터리가 없는 경우 디렉터리를 새로 만들지 여부를 결정합니다.

서버에 복구를 구성하면 검사점을 캡처할 수는 있지만 검사점이 자동으로 캡처되지는 않습니다. 검사점을 호출하는 방법은 3단계. 검사점 캡처를 참조하십시오.

검사점 로그 경로 관리

  • 검사점 파일을 무단으로 읽거나 훼손하지 못하도록 방지하려면 신뢰할 수 있는 엔터티만 액세스할 수 있도록 검사점 파일이 포함된 폴더에 대한 사용 권한을 설정해야 합니다.

  • 각 StreamInsight 인스턴스에 자체의 로그 경로가 있어야 합니다.

  • StreamInsight를 호스팅하는 프로세스에 지정된 폴더에 대한 읽기 및 쓰기 권한이 있어야 합니다.

  • 폴더의 내용을 편집하지 마십시오. StreamInsight는 검사점 파일이 더 이상 필요 없으면 자동으로 삭제합니다.

Out-of-process 서버

클라이언트가 Server.Connect를 호출하여 연결하는 Out-of-process 서버의 경우 서버를 제공하는 사람이 복구 구성을 제공합니다. Out-of-process 서버에 복구 구성이 있으면 클라이언트가 구성된 대로 복구 기능을 사용할 수 있고 서버에 복구 구성이 없으면 클라이언트가 복구 기능을 사용할 수 없습니다.

복구 옵션 지정 방법

다음 방법 중 하나로 복구 설정을 지정할 수 있습니다.

  • Server.Create를 호출할 때 복구 구성을 제공하여 프로그래밍 방식으로 설정을 지정합니다.

  • 응용 프로그램 구성 파일에서 선언적으로 설정을 지정합니다.

    • In-process 서버의 경우 app.config 파일이 응용 프로그램 구성 파일입니다.

    • Out-of-process 서버의 경우에는 StreamInsightHost.exe.config 파일이며 StreamInsight 설치 폴더의 Host 폴더에 있습니다.

두 방법을 모두 사용할 경우 API 호출에서 지정한 설정이 구성 파일의 설정을 무시합니다.

프로그래밍 방식으로 복구 가능한 서버 만들기

다음 예에서는 복구 가능한 In-process 서버를 프로그래밍 방식으로 만드는 방법을 보여 줍니다. 자세한 예는 예를 참조하십시오. Server.Create를 호출할 때 검사점을 설정할 수 없도록 하는 예외를 catch합니다.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

선언적으로 복구 가능한 서버 만들기

다음 예에서는 복구 가능한 서버를 구성 파일을 사용하여 선언적으로 만드는 방법을 보여 줍니다.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

맨 위로

2단계. 복구 가능한 쿼리 정의

복구 가능한 쿼리를 만들려면 코드에 다음 단계를 포함합니다.

  1. 새 쿼리를 만들기 전에 쿼리가 메타데이터에 이미 있는지 확인합니다. 쿼리가 이미 있으면 응용 프로그램이 오류에서 복구되었음을 나타냅니다. 이 경우 쿼리를 다시 만드는 대신 다시 시작하는 코드를 작성해야 합니다.

  2. 쿼리가 메타데이터에 없으면 쿼리를 만들고 ToQuery 메서드의 IsResilient 매개 변수에 대해 true를 지정하여 복구 가능한 쿼리로 정의합니다. IsResilient 매개 변수를 사용하여 Application.CreateQuery 메서드를 호출해도 됩니다.

쿼리에 복구를 구성하면 검사점을 캡처할 수는 있지만 검사점이 자동으로 캡처되지는 않습니다. 검사점을 호출하는 방법은 3단계. 검사점 캡처를 참조하십시오.

복구 가능한 쿼리 정의 예

자세한 예는 예를 참조하십시오.

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

맨 위로

3단계. 검사점 캡처

쿼리를 실행한 후 검사점을 정기적으로 캡처하여 쿼리 상태를 기록할 수 있습니다.

검사점 설정을 지원하는 API 메서드는 비동기 작업의 일반적인 패턴을 따릅니다.

  1. 검사점을 호출하려면 BeginCheckpoint 메서드를 호출합니다. 선택적 AsyncCallback을 제공할 경우 검사점이 완료될 때 호출됩니다. BeginCheckpoint 호출에서 반환된 IAsyncResult는 이 검사점 요청을 식별하며 나중에 EndCheckpoint 또는 CancelCheckpoint 호출에서 사용될 수 있습니다.

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. EndCheckpoint 메서드는 검사점 작업이 완료될 때까지 차단됩니다. 검사점 작업이 성공한 경우에는 true가 반환되고, 오류가 발생한 경우에는 예외가 발생합니다.

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. CancelCheckpoint를 호출하여 검사점 설정 프로세스를 취소할 수도 있습니다. CancelCheckpoint 호출에 성공한 경우 후속 EndCheckpoint 호출은 false를 반환합니다.

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

이 비동기 패턴은 세 가지 방법으로 사용할 수 있습니다.

  • BeginCheckpoint를 호출한 다음에 EndCheckpoint를 호출합니다. 그러면 검사점 작업이 완료될 때까지 EndCheckpoint가 차단된 다음 결과 또는 예외가 반환됩니다. 이 패턴에서는 일반적으로 asyncCallback과 asyncState가 사용되지 않습니다.

  • BeginCheckpoint를 호출할 수 있습니다. 그러면 사용자는 반환된 IAsyncResult의 IsCompleted 속성을 폴링할 수 있습니다. IsCompleted가 true이면 EndCheckpoint를 호출하여 결과를 검색할 수 있습니다. 이 패턴에서는 일반적으로 asyncCallback과 asyncState가 사용되지 않습니다.

  • 콜백 메서드와 함께 BeginCheckpoint를 호출할 수 있습니다. 이 경우 asyncState를 사용하여 호출을 식별하고 필요한 정보를 콜백 메서드에 반환할 수 있습니다. 콜백은 실행될 때 EndCheckpoint를 호출하여 결과를 검색합니다.

EndCheckpoint 메서드는 사용하는 패턴에 관계없이 검사점이 취소된 경우에도 호출해야 합니다. 이 방법은 사용자가 호출에서 반환 값을 가져오는 유일한 방법이며 StreamInsight에서 호출이 완료되었음을 알 수 있는 유일한 방법이기도 합니다. EndCheckpoint를 호출할 때까지 다른 검사점을 시작할 수 없습니다.

검사점 설정 프로세스 중 발생한 오류는 관련 쿼리를 중지하거나 쿼리에 영향을 주지 않습니다. 검사점 작업이 진행되는 동안 쿼리를 중지하면 검사점이 취소됩니다.

맨 위로

4단계. 입력 어댑터에서 이벤트 재생

복구의 일부로 이벤트 재생을 지원하려면 입력 어댑터 팩터리가 IHighWaterMarkInputAdapterFactory 또는 IHighWaterMarkTypedInputAdapterFactory 인터페이스를 구현해야 합니다. 그런 다음 어댑터 팩터리의 Create 메서드를 호출하면 어댑터가 재생할 이벤트를 손쉽게 식별할 수 있도록 상위 워터마크가 제공됩니다.

출력이 완료될 수 있도록 모든 입력 어댑터는 물리적 스트림에서 상위 워터마크가 나타내는 지점이나 그 뒤에서 발생한 모든 이벤트를 재생해야 합니다.

맨 위로

5단계. 출력 어댑터에서 중복 항목 제거

복구의 일부로 중복 항목 제거를 지원하려면 출력 어댑터 팩터리가 IHighWaterMarkOutputAdapterFactory 또는 IHighWaterMarkTypedOutputAdapterFactory 인터페이스를 구현해야 합니다. 그런 다음 어댑터 팩터리의 Create 메서드를 호출하면 어댑터가 중복 값을 손쉽게 식별할 수 있도록 상위 워터마크 및 오프셋 값이 제공됩니다. 오프셋이 필요한 이유는 출력 스트림에서 검사점에 해당하는 위치가 스트림의 어느 위치에든 나올 수 있기 때문입니다.

쿼리를 처음 시작하면 상위 워터마크나 오프셋 없이 어댑터 팩터리의 Create 메서드가 호출됩니다. 서버에서 아직 쿼리의 검사점을 캡처하지 않은 경우에는 DateTime.MinValue의 상위 워터마크와 오프셋 0을 사용하여 어댑터 팩터리의 Create 메서드가 호출됩니다.

쿼리가 제대로 재생된 경우 마지막 검사점이 캡처된 후, 쿼리가 중단되기 전에 생성되었던 모든 이벤트가 쿼리 재시작 시 다시 생성됩니다. 이것이 바로 출력 어댑터가 제거해야 하는 중복 항목입니다. 중복 항목을 제거하는 방법은 출력 어댑터가 결정합니다. 원래 복사본을 삭제하거나 중복되는 복사본을 무시하는 방법 중 한 가지를 선택할 수 있습니다.

출력이 동일하도록 모든 입력 어댑터는 입력 이벤트를 적절히 재생해야 하고 모든 출력 어댑터는 물리적 스트림에서 중단 전에 발생했으며 상위 워터마크 오프셋이 나타내는 지점이나 그 뒤에서 발생한 모든 중복 이벤트를 제거해야 합니다.

맨 위로

6단계. 오류 복구

서버는 시작할 때 자동으로 복구를 수행하여 모든 쿼리를 일관된 상태로 만듭니다. 이 작업은 비동기 작업이므로 Server.Create 호출은 복구가 완료되기 전에 결과를 반환합니다.

  • 복구가 가능하지 않은 쿼리는 중지됨 상태로 전환됩니다. 이 동작은 변경되지 않았습니다.

  • 복구 가능한 쿼리는 초기화하는 중 상태로 전환됩니다. 그런 다음 서버는 저장된 검사점 정보를 로드합니다.

이때 Start를 호출하여 쿼리를 다시 시작할 수 있습니다. 복구 가능한 쿼리는 초기화가 완료되는 즉시 다시 시작됩니다.

시작 코드는 다음 단계를 수행하여 오류를 복구해야 합니다.

  1. 응용 프로그램의 쿼리 목록을 메타데이터에서 검색합니다.

  2. 각 쿼리에 대해 쿼리가 메타데이터에 이미 있는지 확인합니다.

    1. 쿼리가 이미 있으면 다시 시작합니다.

    2. 쿼리가 메타데이터에 없으면 쿼리를 만들고 2단계. 복구 가능한 쿼리 정의에 설명된 대로 쿼리를 복구 가능한 쿼리로 정의합니다.

복구 중 문제가 발생하면 복구 없이 서버를 다시 시작할 수 있습니다.

맨 위로

복구를 비활성화하지 않고 종료

Server의 Dispose 메서드를 호출하면 복구를 비활성화하지 않고 서버를 종료할 수 있습니다.

  • 복구가 가능하지 않은 쿼리는 중지됩니다.

  • 복구 가능한 쿼리는 일지 중지됩니다. 서버를 다시 시작하면 서버는 일시 중지된 쿼리의 상태를 복구하려고 합니다. 이 동작이 수행되지 않도록 하려면 쿼리를 중지한 후 서버를 종료합니다.

이런 식으로 서버를 종료할 경우 복구가 가능하지 않은 쿼리와 복구 가능한 쿼리의 메타데이터가 모두 유지됩니다.

맨 위로

재생 및 중복 제거를 비롯하여 복구 가능한 응용 프로그램의 전체 코드 샘플은 Codeplex의 StreamInsight 샘플 페이지에서 검사점 설정 샘플을 참조하십시오.

맨 위로

명시적 개발 모델을 사용하여 복구 가능한 쿼리 정의

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

검사점 설정 - 콜백 랑데부 모델

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

참고 항목

개념

StreamInsight 복구

복구 가능한 StreamInsight 응용 프로그램 빌드

복구 가능한 StreamInsight 응용 프로그램 모니터링

복구 가능한 StreamInsight 응용 프로그램 문제 해결