Compilando aplicativos resilientes do StreamInsight

Este tópico descreve as etapas para criar um aplicativo resiliente do StreamInsight.

A resiliência está disponível apenas na edição Premium do StreamInsight. Para obter mais informações, consulte Escolhendo uma edição do StreamInsight.

Para obter um exemplo de código completo de um aplicativo resiliente que inclui reprodução e desduplicação, consulte o Exemplo de ponto de verificação na página de exemplos do StreamInsight em Codeplex.

Neste tópico

  1. Etapa 1. Configurando um servidor resiliente

  2. Etapa 2. Definindo uma consulta resiliente

  3. Etapa 3. Capturando pontos de verificação

  4. Etapa 4. Reproduzindo eventos no adaptador de entrada

  5. Etapa 5. Eliminando duplicatas no adaptador de saída

  6. Etapa 6. Recuperando-se de falhas

  7. Desligando sem desabilitar a recuperação

  8. Exemplos

Etapa 1. Configurando um servidor resiliente

Configurações necessárias

Para configurar um servidor resiliente, forneça valores para as seguintes configurações ao criar o servidor:

  • Um repositório de metadados. Você deve usar o SQL Server Compact para armazenar metadados para o servidor; os metadados não podem ser armazenados na memória.

  • Um caminho de log. Essa configuração determina onde os dados do ponto de verificação são armazenados para consultas resilientes. O valor padrão do caminho é o diretório de trabalho do processo do StreamInsight. Uma configuração relacionada, CreateLogPathIfMissing, determina se o diretório especificado será criado caso ele não exista.

A configuração de um servidor para resiliência possibilita a captura dos pontos de verificação, mas não faz com que eles sejam capturados. Para obter informações sobre como invocar pontos de verificação, consulte a Etapa 3. Capturando pontos de verificação.

Gerenciando o caminho de log do ponto de verificação

  • Para evitar a leitura não autorizada ou falsificação com os arquivos de ponto de verificação, garanta que as permissões da pasta recipiente sejam definidas de modo que somente entidades confiáveis tenham acesso.

  • Cada instância do StreamInsight deve ter seu próprio caminho de log.

  • Assegure que o processo que está hospedando o StreamInsight tem acesso de leitura e gravação na pasta especificada.

  • Não edite o conteúdo da pasta. O StreamInsight exclui os arquivos de ponto de verificação quando eles não são mais necessários.

Servidores fora do processo

No caso de um servidor fora de processo, ao qual o cliente se conecta chamando Server.Connect, a configuração de resiliência é fornecida pela pessoa que provisiona o servidor. Se o servidor fora de processo tiver uma configuração de resiliência, o cliente poderá usá-lo conforme configurado; se o servidor não tiver uma configuração de resiliência, o cliente não poderá usar recursos de resiliência.

Métodos para especificar opções de resiliência

Você pode especificar configurações de resiliência através de um destes métodos:

  • Especifique as configurações de modo programático fornecendo a configuração de resiliência ao chamar Server.Create.

  • Especifique as configurações declarativamente no arquivo de configuração do aplicativo.

    • Para um servidor em processo, esse é o arquivo app.config.

    • Para um servidor fora de processo, esse é o arquivo StreamInsightHost.exe.config, que pode ser encontrado na pasta Host abaixo da pasta de instalação do StreamInsight.

Se você usar os dois métodos, as configurações especificadas na chamada da API substituirão as configurações no arquivo de configuração.

Criando um servidor resiliente de modo programático

O exemplo a seguir mostra como criar um servidor em processo resiliente de modo programático. Para obter exemplos mais detalhados, consulte Exemplos. Tente capturar qualquer exceção que ocasionará a falha da definição de ponto de verificação quando você chamar Server.Create.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

Criando um servidor resiliente declarativamente

O exemplo a seguir mostra como criar um servidor resiliente declarativamente usando um arquivo de configuração.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

TOP

Etapa 2. Definindo uma consulta resiliente

Para criar uma consulta resiliente, inclua as etapas a seguir no código.

  1. Antes de criar uma nova consulta, verifique se a consulta já existe nos metadados. Se a consulta já existir, isso indicará que o aplicativo se recuperou de uma falha. O código deverá reiniciar a consulta, em vez de recriá-la.

  2. Se a consulta não existir nos metadados, crie-a e defina-a como resiliente especificando true para o parâmetro IsResilient do método ToQuery. Você também pode chamar o método Application.CreateQuery com o parâmetro IsResilient.

A configuração de uma consulta para resiliência possibilita a captura dos pontos de verificação, mas não faz com que eles sejam capturados. Para obter informações sobre como invocar pontos de verificação, consulte a Etapa 3. Capturando pontos de verificação.

Exemplo de definição de uma consulta resiliente

Para obter exemplos mais detalhados, consulte Exemplos.

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

TOP

Etapa 3. Capturando pontos de verificação

Depois que a(s) consulta(s) estiver(em) em execução, capture os pontos de verificação periodicamente para registrar o estado das consultas.

Os métodos API que oferecem suporte a pontos de verificação seguem o padrão normal de uma operação assíncrona.

  1. Para invocar um ponto de verificação, chame o método BeginCheckpoint. Se você fornecer um AsyncCallback opcional, ele será chamado quando o ponto de verificação for concluído. O IAsyncResult retornado pela chamada para BeginCheckpoint identifica essa solicitação de ponto de verificação e pode ser usado posteriormente em chamadas para EndCheckpoint ou CancelCheckpoint.

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. O método EndCheckpoint fica bloqueado até que a operação de ponto de verificação seja concluída. Se a operação de ponto de verificação for bem-sucedida, a chamada retornará true; se os erros ocorrerem, a chamada gerará uma exceção.

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. Também é possível chamar CancelCheckpoint para cancelar o processo definição de ponto de verificação. Quando a chamada para CancelCheckpoint for bem-sucedida, a chamada subsequente para EndCheckpoint retornará false.

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

Esse padrão assíncrono pode ser usado de três maneiras diferentes:

  • Uma chamada para BeginCheckpoint pode ser seguida por uma chamada para EndCheckpoint. EndCheckpoint, então, ficará bloqueado até que a operação de definição de ponto de verificação seja concluída e retornará o resultado (ou exceção). Nesse padrão, asyncCallback e asyncState geralmente não são usados.

  • BeginCheckpoint pode ser chamado, e o usuário poderá sondar a propriedade IsCompleted do IAsyncResult retornado. Quando IsCompleted for true, EndCheckpoint poderá ser chamado para recuperar o resultado. Nesse padrão, asyncCallback e asyncState geralmente não são usados.

  • BeginCheckpoint pode ser chamado com um método de retorno de chamada. Nesse caso, asyncState pode ser usado para identificar a chamada retornar quaisquer informações necessárias ao método de retorno de chamada. Quando o retorno de chamada é executado, ele chama EndCheckpoint para recuperar o resultado.

O método EndCheckpoint deve ser chamado, independentemente de qual padrão seja usado e até mesmo quando o ponto de verificação é cancelado. Esse método é a única maneira para o usuário obter um valor de retorno da chamada e a única maneira do StreamInsight saber que a chamada foi concluída. Você só pode começar outro ponto de verificação após ter chamado EndCheckpoint.

Os erros que ocorrem no processo de definição de ponto de verificação não interrompem nem afetam as consultas associadas. Se você interromper uma consulta enquanto uma operação de ponto de verificação estiver em andamento, o ponto de verificação será cancelado.

TOP

Etapa 4. Reproduzindo eventos no adaptador de entrada

Para oferecer suporte à reprodução de eventos como parte da recuperação, a fábrica de adaptador de entrada deve implementar a interface IHighWaterMarkInputAdapterFactory ou IHighWaterMarkTypedInputAdapterFactory. Em seguida, a chamada para o método Create da fábrica de adaptador fornece a marca d'água alta que ajuda o adaptador a identificar os eventos a serem reproduzidos.

Para assegurar que a saída seja completa, todos os adaptadores de entrada devem reproduzir todos os eventos no fluxo físico que ocorrem na ou após a posição indicada pela marca d'água alta.

TOP

Etapa 5. Eliminando duplicatas no adaptador de saída

Para oferecer suporte à eliminação de duplicatas como parte da recuperação, o adaptador de saída deve implementar a interface IHighWaterMarkOutputAdapterFactory ou IHighWaterMarkTypedOutputAdapterFactory. Em seguida, a chamada para o método Create da fábrica de adaptador fornece a marca d'água alta e o valor de deslocamento que ajudam o adaptador a identificar valores de duplicata. Esse deslocamento é necessário porque o local do fluxo de saída correspondente ao ponto de verificação pode ficar em qualquer ponto do fluxo.

Quando a consulta for iniciada pela primeira vez, o método Create da fábrica de adaptador será chamado sem a marca d'água alta e o deslocamento. Se o servidor ainda não tiver capturado nenhum ponto de verificação para a consulta, o método Create da fábrica de adaptador será chamado com uma marca d'água alta DateTime.MinValue e um deslocamento 0 (zero).

Se uma consulta for reproduzida corretamente, os eventos que foram produzidos após o último ponto de verificação ter sido capturado, mas antes da falha, serão produzidos novamente após a reinicialização. Essas são as duplicatas que o adaptador de saída deve remover. Como elas são removidas no adaptador de saída: as cópias originais podem ser abandonadas ou as cópias duplicatas podem ser ignoradas.

Para assegurar que a saída seja equivalente, todos os adaptadores de entrada devem reproduzir corretamente os eventos de entrada, e todos os adaptadores de saída devem remover todos os eventos duplicados no fluxo físico que ocorrem antes da falha e na ou após a posição indicada pelo deslocamento da marca d'água alta.

TOP

Etapa 6. Recuperando-se de falhas

O servidor executar automaticamente a recuperação na inicialização e coloca todas as consultas em um estado consistente. Essa é uma operação assíncrona; Consequentemente, a chamada para Server.Create é retornada antes que a recuperação seja concluída.

  • As consultas não resilientes são colocadas no estado Parada. Esse comportamento não foi alterado.

  • As consultas resilientes são colocadas no estado Inicializando. Em seguida, o servidor carrega as informações de ponto de verificação salvas.

Você pode chamar Start neste ponto para reiniciar as consultas. As consultas resilientes serão reiniciadas assim que a inicialização é concluída.

O código de inicialização deve executar as seguinte etapas para recuperar-se de falhas:

  1. Recupere a lista de consultas do aplicativo nos metadados.

  2. Para cada consulta, verifique se a consulta já existe nos metadados.

    1. Em caso afirmativo, reinicie-a.

    2. Se a consulta não existir nos metadados, crie-a e defina-a como resiliente, conforme descrito anteriormente na Etapa 2. Definindo uma consulta resiliente.

Se um problema ocorrer durante a recuperação, você poderá reiniciar o servidor sem resiliência.

TOP

Desligando sem desabilitar a recuperação

Você pode desligar o servidor sem desabilitar a recuperação chamando o método Dispose do Server.

  • As consultas não resilientes são interrompidas.

  • As consultas resilientes são suspensas. Quando você reinicia o servidor, este tenta recuperar o estado das consultas suspensas. Para evitar esse comportamento, interrompa as consultas antes do desligamento.

Os metadados das consultas resilientes e não resilientes são preservados quando você desliga o servidor dessa maneira.

TOP

Exemplos

Para obter um exemplo de código completo de um aplicativo resiliente que inclui reprodução e desduplicação, consulte o Exemplo de ponto de verificação na página de exemplos do StreamInsight em Codeplex.

TOP

Definindo uma consulta resiliente sem o modelo de desenvolvimento explícito

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Definindo ponto de verificação - modelo de reunião de retorno de chamada

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Consulte também

Conceitos

Resiliência do StreamInsight

Compilando aplicativos resilientes do StreamInsight

Monitorando aplicativos resilientes do StreamInsight

Solucionando problemas de aplicativos resilientes do StreamInsight