Compilar aplicaciones StreamInsight resistentes

En este tema se describen los pasos para crear una aplicación StreamInsight resistente.

La resistencia solo está disponible en la edición Premium de StreamInsight. Para obtener más información, vea Elegir una edición de StreamInsight.

Para obtener una muestra de código completa de una aplicación resistente que incluya reproducción y anulación de duplicación, vea la muestra de comprobación de puntos en la página de muestras de StreamInsight en Codeplex.

Configuración obligatoria

Para configurar un servidor resistente, proporcione valores para las siguientes opciones de configuración al crear el servidor:

  • Almacén de metadatos. Debe usar SQL Server Compact para almacenar los metadatos del servidor; los metadatos no se pueden almacenar en memoria.

  • Ruta de acceso del registro. Esta configuración determina dónde se almacenan los datos de punto de comprobación para las consultas resistentes. El valor predeterminado de la ruta de acceso es el directorio de trabajo del proceso StreamInsight. Una configuración relacionada, CreateLogPathIfMissing, determina si se creará el directorio especificado si no existe.

La configuración de un servidor para la resistencia permite capturar puntos de comprobación, pero no provoca que se capturen los puntos de comprobación. Para obtener más información acerca de cómo invocar puntos de comprobación, vea Paso 3. Capturar puntos de comprobación.

Administración de la ruta de acceso del registro de puntos de comprobación

  • Para evitar la lectura o modificación no autorizadas de los archivos de punto de comprobación, asegúrese de que los permisos de la carpeta contenedora están configurados de forma que solo las entidades de confianza tengan acceso.

  • Cada instancia de StreamInsight debe tener su propia ruta de acceso del registro.

  • Asegúrese de que el proceso que hospeda StreamInsight tiene acceso de lectura y escritura en la carpeta especificada.

  • No edite el contenido de la carpeta. StreamInsight elimina los archivos de punto de comprobación cuando ya no se necesitan.

Servidores fuera de proceso

En el caso de un servidor fuera de proceso, al que el cliente se conecta mediante una llamada a Server.Connect, la persona que aprovisiona el servidor proporciona la configuración de resistencia. Si el servidor fuera de proceso tiene una configuración de resistencia, el cliente puede usarlo según lo configurado; si el servidor no tiene una configuración de resistencia, el cliente no puede usar las características de resistencia.

Métodos para especificar las opciones de resistencia

Puede especificar la configuración de resistencia mediante uno de los métodos siguientes:

  • Especifique las opciones mediante programa al proporcionar la configuración de resistencia al llamar a Server.Create.

  • Especifique los valores mediante declaración en el archivo de configuración de la aplicación.

    • Para un servidor en proceso, se trata del archivo app.config.

    • Para un servidor fuera de proceso, se trata del archivo StreamInsightHost.exe.config, que se puede encontrar en la carpeta Host de la carpeta de instalación StreamInsight.

Si usa ambos métodos, la configuración que especifique en la llamada de API reemplazará los valores del archivo de configuración.

Crear un servidor resistente mediante programación

En el siguiente ejemplo se muestra cómo crear un servidor en proceso resistente mediante programación. Para obtener ejemplos más detallados, vea Ejemplos. Intente capturar las excepciones que podrían impedir la comprobación de puntos al llamar a Server.Create.

SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";

CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";

using (EmbeddedServer server = 
    Server.Create("Default", metadataConfig, recoveryConfig))

Crear un servidor resistente mediante declaración

El ejemplo siguiente muestra cómo crear un servidor resistente mediante declaración con un archivo de configuración.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
    <appSettings>
            <add key="InstanceName" value="Default"/>
            <add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
            <add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
            <add key="CheckpointEnabled" value="true"/>
            <add key="CheckpointLogPath" value="CepLogPath"/>
            <add key="CreateCheckpointLogPathIfMissing" value="true"/>
    </appSettings>
    <runtime>
        <gcServer enabled="true"/>
    </runtime>
</configuration>

ARRIBA

Para crear una consulta resistente, incluya los siguientes pasos en el código.

  1. Antes de crear una nueva consulta, compruebe si la consulta ya existe en los metadatos. Si la consulta ya existe, significa que la aplicación se ha recuperado de un error. El código debe reiniciar la consulta en vez de volver a crearla.

  2. Si la consulta no existe en los metadatos, créela y defínala como resistente mediante la especificación de true para el parámetro IsResilient del método ToQuery. También puede llamar al método Application.CreateQuery con el parámetro IsResilient.

La configuración de una consulta para la resistencia permite capturar puntos de comprobación, pero no provoca que se capturen los puntos de comprobación. Para obtener más información acerca de cómo invocar puntos de comprobación, vea Paso 3. Capturar puntos de comprobación.

Ejemplo de definición de una consulta resistente

Para obtener ejemplos más detallados, vea Ejemplos.

Query query = application.CreateQuery(
                     "TrafficSensorQuery",
                     "Minute average count, filtered by location threshold",
                     queryBinder,
                     true);

ARRIBA

Después de que la consulta o las consultas estén en ejecución, capture los puntos de comprobación periódicamente para registrar el estado de las consultas.

Los métodos de API que admiten la comprobación de puntos siguen el patrón típico para una operación asincrónica.

  1. Para invocar un punto de comprobación llame al método BeginCheckpoint. Si proporciona el método AsyncCallback opcional, se le llamará cuando se complete el punto de comprobación. El valor de IAsyncResult devuelto de la llamada a BeginCheckpoint identifica esta solicitud de punto de comprobación y se pueden usar más tarde en llamadas a EndCheckpoint o CancelCheckpoint.

    /// <summary>
    /// Take an asynchronous checkpoint for the query.
    /// </summary>
    /// <param name="query">The query to checkpoint.</param>
    /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param>
    /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param>
    /// <returns></returns>
    IAsyncResult BeginCheckpoint(
         Query query, 
         AsyncCallback asyncCallback, 
         Object asyncState);
    
  2. El método EndCheckpoint se bloquea hasta que se completa la operación de punto de comprobación. Si la operación de punto de comprobación se realiza correctamente, la llamada devuelve true; si se producen errores, la llamada inicia una excepción.

    /// <summary>
    /// Waits for the pending asynchronous checkpoint request to complete.
    /// </summary>
    /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
    /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns>
    bool EndCheckpoint(
         IAsyncResult asyncResult);
    
  3. También puede llamar a CancelCheckpoint para cancelar el proceso de comprobación de puntos. Cuando la llamada a CancelCheckpoint se realiza correctamente, la llamada post a EndCheckpoint devuelve false.

    /// <summary>
    /// Cancels the pending asynchronous checkpoint request.
    /// </summary>
    /// <param name="asyncResult">The asyncResult handle identifying the call.</param>
    void CancelCheckpoint(
         IAsyncResult asyncResult);
    

Este patrón asincrónico se puede usar de tres formas distintas:

  • Después de una llamada a BeginCheckpoint puede realizarse una llamada a EndCheckpoint. EndCheckpoint se bloquea hasta que se completa la operación de punto de comprobación y, a continuación, devuelve el resultado (o la excepción). En este patrón, normalmente no se usan asyncCallback y asyncState.

  • Se puede llamar a BeginCheckpoint y el usuario puede sondear la propiedad IsCompleted del IAsyncResult devuelto. Cuando IsCompleted es true, se puede llamar a EndCheckpoint para recuperar el resultado. En este patrón, normalmente no se usan asyncCallback y asyncState.

  • Se puede llamar a BeginCheckpoint con un método de devolución de llamada. En este caso, se puede usar asyncState para identificar la llamada y devolver la información necesaria al método de devolución de llamada. Cuando se ejecuta la devolución de llamada, llama a EndCheckpoint para recuperar el resultado.

Se debe llamar al método EndCheckpoint, independientemente del patrón que se use e incluso si se cancela el punto de comprobación. Este método es la única forma que tiene el usuario para obtener un valor de devolución de la llamada y la única forma de que StreamInsight sepa que la llamada se ha completado. No puede iniciar otro punto de comprobación hasta que haya llamado a EndCheckpoint.

Los errores que se producen en el proceso de comprobación no detienen o afectan a las consultas asociadas. Si detiene una consulta mientras está en curso una operación de punto de comprobación, se cancela el punto de comprobación.

ARRIBA

Para admitir la reproducción de eventos como parte de la recuperación, el generador de adaptadores de entrada debe implementar la interfaz IHighWaterMarkInputAdapterFactory o IHighWaterMarkTypedInputAdapterFactory. La llamada al método Create del generador de adaptadores suministra el límite máximo que ayuda al adaptador a identificar los eventos que se reproducirán.

Para garantizar que la salida está completa, todos los adaptadores de entrada deben reproducir todos los eventos del flujo físico que se producen en la posición indicada por el límite máximo o después de ella.

ARRIBA

Para admitir la eliminación de duplicados como parte de la recuperación, el generador de adaptadores de salida debe implementar la interfaz IHighWaterMarkOutputAdapterFactory o IHighWaterMarkTypedOutputAdapterFactory. La llamada al método Create del generador de adaptadores suministra el límite máximo y valor de desplazamiento que ayudan al adaptador a identificar los valores duplicados. Este desplazamiento es necesario porque la ubicación en el flujo de salida correspondiente al punto de comprobación puede estar en cualquier punto del flujo.

Cuando la consulta se inicia por primera vez, se llama al método Create del generador de adaptadores sin el límite máximo ni el desplazamiento. Si el servidor todavía no ha capturado ningún punto de comprobación para la consulta, se llama al método Create del generador de adaptadores con un límite máximo de DateTime.MinValue y un desplazamiento de 0 (cero).

Si una consulta se reproduce correctamente, los eventos que se han producido después de que se capturara el último punto de comprobación antes de la interrupción se volverán a producir tras el reinicio. Se trata de los duplicados que el adaptador de salida debe quitar. El modo de quitarlos depende del adaptador de salida: las copias originales se pueden abandonar o las copias duplicadas se pueden omitir.

Para garantizar que la salida es equivalente, todos los adaptadores de entrada deben reproducir correctamente los eventos de entrada y todos los adaptadores de salida deben quitar todos los eventos duplicados del flujo físico que se produjeron antes de la interrupción y que se producen en la posición indicada por el desplazamiento de límite máximo o después de ella.

ARRIBA

El servidor realiza automáticamente la recuperación al iniciarse y pone todas las consultas en un estado coherente. Se trata de una operación asincrónica; como resultado, la llamada a Server.Create vuelve antes de que haya finalizado la recuperación.

  • Los consultas no resistentes se ponen en estado Detenido. Este comportamiento no ha cambiado.

  • Las consultas resistentes se ponen en estado Inicializándose. A continuación, el servidor carga la información de puntos de comprobación guardada.

Puede llamara a Start en este momento para reiniciar las consultas. Las consultas resistentes se reiniciarán tan pronto como se complete la inicialización.

El código de inicio debe realizar los siguientes pasos para recuperarse del error:

  1. Recuperar la lista de consultas de la aplicación a partir de los metadatos.

  2. Por cada consulta, comprobar si la consulta ya existe en los metadatos.

    1. Si la consulta ya existe, reiniciarla.

    2. Si la consulta no existe en los metadatos, crearla y definirla como resistente, tal como se describe en Paso 2. Definir una consulta resistente.

Si se produce un problema durante la recuperación, puede reiniciar el servidor sin resistencia.

ARRIBA

Puede apagar el servidor sin deshabilitar la recuperación si llama al método Dispose de Server.

  • Los consultas no resistentes se detienen.

  • Los consultas resistentes se suspenden. Cuando se reinicia el servidor, este intenta recuperar el estado de las consultas suspendidas. Para evitar este comportamiento, detenga las consultas antes del apagado.

Los metadatos de los consultas no resistentes y resistentes se mantienen al apagar el servidor de esta forma.

ARRIBA

Para obtener una muestra de código completa de una aplicación resistente que incluya reproducción y anulación de duplicación, vea la muestra de comprobación de puntos en la página de muestras de StreamInsight en Codeplex.

ARRIBA

Definir una consulta resistente con el modelo de desarrollo explícito

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
            // SQL CE was available as an optional metadata provider in v1.1
            // For the server to support recovery, this becomes mandatory
            // A log path is also a mandatory requirement.
            SqlCeMetadataProviderConfiguration metadataConfig = new
               SqlCeMetadataProviderConfiguration();
            metadataConfig.CreateDataSourceIfMissing = true;
            metadataConfig.DataSource = "C:\\CepMetadata.sdf";

            ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
            recoveryConfig.CreateLogPathIfMissing = true;
            recoveryConfig.LogPath = "C:\\CepLogPath";


            using (EmbeddedServer server = Server.Create(
                                            "Default", metadataConfig, recoveryConfig))
            {
                try
                {
                    Application application = server.CreateApplication("TrafficJoinSample");

                    QueryTemplate queryTemplate = CreateQueryTemplate(application);

                    InputAdapter csvInputAdapter =     
                                           application.CreateInputAdapter<TextFileReaderFactory>(
                                           "CSV Input", "Reading tuples from a CSV file");
                    OutputAdapter csvOutputAdapter =
                                          application.CreateOutputAdapter<TextFileWriterFactory>(
                                          "CSV Output", "Writing result events to a CSV file");

                    // bind query to event producers and consumers
                    QueryBinder queryBinder = BindQuery(
                                              csvInputAdapter, csvOutputAdapter, queryTemplate);

                    // Create bound query that can be run
                    Console.WriteLine("Registering bound query");
                    Query query = application.CreateQuery(
                                    "TrafficSensorQuery",
                                    "Minute average count, filtered by location threshold",
                                    queryBinder,
                                    true);   // v1.2 addition - Specify the query as resilient

                    // Start the query
                    // v1.2 has additional semantics during recovery

                    query.Start();

                    // submit a checkpoint request

                    // query.Stop();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }

Comprobar puntos: modelo de encuentro de devolución de llamada

namespace StreamInsight.Samples.TrafficJoinQuery
{
    using...

    internal class EmbeddedCepServer
    {
        internal static void Main()
        {
                        // Same code through query start …
            {
                try
                {
                    // Start the query
                    query.Start();

                    // submit a checkpoint request
                    IAsyncResult result = server.BeginCheckpoint(query,
                        r => {
                            if (server.EndCheckpoint(r))
                            {
                                // the checkpoint succeeded
                            }
                            else
                            {
                                // the checkpoint was canceled
                            }
                        },
                        null);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    Console.ReadLine();
                }
            }

            Console.WriteLine("\npress enter to exit application");
            Console.ReadLine();
        }
Mostrar: