Share via


Usando o binder de consulta

Um Binder de consulta é um modelo de desenvolvimento do cliente que fornece o nível máximo de flexibilidade e capacidade de reutilização ao criar aplicativos do StreamInsight. Nesse modelo, são registrados adaptadores e modelos de consulta como objetos de metadados separados que podem ser associados mais tarde para criar uma instância de uma consulta. Isso permite que o desenvolvedor tenha controle total do seu aplicativo e do ambiente de desenvolvimento usando uma associação de consulta explícita sobre a API de modelo do objeto.

Casos típicos de uso do modelo explícito de desenvolvimento de servidor incluem aplicativos StreamInsight que exigem:

  • Total controle e acesso ao servidor StreamInsight.

  • A reutilização de consultas através de composição de consulta estática ou dinâmica ou a reutilização de adaptadores, tipos de evento e modelos de consulta definidos por terceiros.

Principais características do modelo de desenvolvimento de binder de consulta

Principais características do modelo de binder de consulta:

  • O desenvolvedor deve criar todos os objetos de metadados explicitamente e registrá-los no servidor StreamInsight.

  • O modelo dá suporte à criação e ao uso de vários objetos (modelos de consulta, consultas, aplicativos e adaptadores). Todos os objetos devem ser registrados em um aplicativo.

    O modelo de consulta e a instância de consulta devem ser explicitamente registrados com o servidor antes de a consulta ser executada. Os adaptadores de entrada e saída devem ser explicitamente registrados para que o modelo de consulta ou a consulta referencie esses objetos. Além disso, todos os objetos devem ser registrados em um aplicativo. Tipos de evento usados por adaptadores e modelos de consulta são registrados implicitamente.

Exemplos

O exemplo a seguir cria um objeto de servidor StreamInsight e um objeto de aplicativo chamado myApp no servidor. Em seguida, ele cria e registra todos os objetos necessários do StreamInsight para importar, processar e exportar os fluxos de eventos.

Primeiro, são criados o servidor e o objeto de aplicativo.

server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Depois, os adaptadores de entrada e saída são registrados no aplicativo.

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

O modelo de consulta é especificado sobre um fluxo desassociado. O único parâmetro necessário para criar um fluxo desassociado é um nome de fluxo, que mais tarde é necessário na associação de adaptador.

var inputstream = CepStream<MyDataType>.Create("filterInput");

var filtered = from e in inputstream
               where e.Value > 95
               select e;

QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
  • A última chamada registra o modelo de consulta no aplicativo. O modelo de consulta registrado pode ser reutilizado em várias associações e, portanto, instanciado em vários consultas, cada uma associada a adaptadores de entrada e saída potencialmente diferentes. Essas associações para modelos de consulta registrados são definidas através do objeto QueryBinder:
QueryBinder queryBinder = new QueryBinder(filterQT);

queryBinder.BindProducer<MyDataType>("filterInput",
                                      inputAdapter,
                                      new InputAdapterConfig { someFlag = true },
                                      EventShape.Point);

queryBinder.AddConsumer("filterOutput",
                         outputAdapter,
                         new OutputAdapterConfig { someString = "foo" },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

O método BindProducer() associa um objeto de adaptador de entrada (deve ser registrado no aplicativo) a um fluxo com o nome especificado, no caso "filterInput". Isso permite fazer a distinção entre vários pontos de entrada de um modelo de consulta. Além do adaptador de entrada, são necessários os parâmetros específicos de associação (a configuração de adaptador e a forma de evento desejada).

O método AddConsumer() associa um objeto de adaptador de saída (que deve ser registrado no aplicativo) ao único fluxo de saída do modelo de consulta. O nome do fluxo de saída fornecido, no caso "validated", pode ser usado para identificar o fluxo para fins de diagnóstico. Assim como no adaptador de entrada, os parâmetros específicos de associação são fornecidos para o adaptador de saída.

O objeto de consulta é criado com base no binder de consulta, em um identificador de consulta e em uma descrição textual. A última etapa é iniciar a consulta.

query.Start();

Consultas com vários fluxos de entrada

O exemplo a seguir mostra como criar um modelo de consulta que usa vários fluxos de entrada. Um modelo de consulta pode ter vários pontos de entrada, cada um alimentado a partir de uma fonte de dados diferente; por exemplo, quando dois fluxos devem ser unidos. A associação de fluxo adequada ocorre através da especificação do nome do fluxo, conforme mostrado no exemplo a seguir.

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...

InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

Modificando um aplicativo existente

Note que você trabalha no modelo de binder de consulta com o modelo de consulta e objetos de adaptador sem precisar necessariamente criá-los no mesmo aplicativo. O exemplo a seguir supõe uma conexão com um servidor existente e recupera as entidades de metadados existentes através da API de modelo de objeto do StreamInsight em vez de criá-las.

Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];

Usando um repositório de metadados persistentes

Durante a criação de um servidor StreamInsight, um parâmetro opcional do método Server.Create() é o tipo de repositório de metadados a ser usado. Por padrão, os metadados são armazenados em memória. Outra opção é manter metadados em disco através de um banco de dados SQL Server Compact 3.5. O exemplo a seguir mostra como especificar um banco de dados SQL Server Compact 3.5 como o repositório de metadados.

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;

server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");

Note que, ao especificar um banco de dados de metadados existente durante a criação do servidor, você ocasionará a leitura de todos os metadados do arquivo especificado. As entidades de metadados podem ser recuperadas através da API de modelo de objeto do StreamInsight.

Exemplo completo

using (Server server = Server.Create("MyInstance"))
{
try
{
    Application myApp = server.CreateApplication("MyApp");
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

    var inputstream = CepStream<MyDataType>.Create("filterInput");

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
    QueryBinder queryBinder = new QueryBinder(filterQT);

    queryBinder.BindProducer<MyDataType>("filterInput",
                                         inputAdapter,
                                         new InputAdapterConfig { someFlag = true },
                                         EventShape.Point);

    queryBinder.AddConsumer("filterOutput",
                                                 outputAdapter,
                                                 new OutputAdapterConfig { someString = "foo" },
                                                 EventShape.Point,
                                                 StreamEventOrder.FullyOrdered);

    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

    query.Start();
    Console.ReadLine();
    query.Stop();
}
catch (Exception e)
{
    Console.WriteLine(e.ToString());
}
}

Consulte também

Conceitos

Conceitos do servidor StreamInsight