Usando o binder de consulta
Um Binder de consulta é um modelo de desenvolvimento do cliente que fornece o nível máximo de flexibilidade e capacidade de reutilização ao criar aplicativos do StreamInsight. Nesse modelo, são registrados adaptadores e modelos de consulta como objetos de metadados separados que podem ser associados mais tarde para criar uma instância de uma consulta. Isso permite que o desenvolvedor tenha controle total do seu aplicativo e do ambiente de desenvolvimento usando uma associação de consulta explícita sobre a API de modelo do objeto.
Casos típicos de uso do modelo explícito de desenvolvimento de servidor incluem aplicativos StreamInsight que exigem:
Total controle e acesso ao servidor StreamInsight.
A reutilização de consultas através de composição de consulta estática ou dinâmica ou a reutilização de adaptadores, tipos de evento e modelos de consulta definidos por terceiros.
Principais características do modelo de desenvolvimento de binder de consulta
Principais características do modelo de binder de consulta:
O desenvolvedor deve criar todos os objetos de metadados explicitamente e registrá-los no servidor StreamInsight.
O modelo dá suporte à criação e ao uso de vários objetos (modelos de consulta, consultas, aplicativos e adaptadores). Todos os objetos devem ser registrados em um aplicativo.
O modelo de consulta e a instância de consulta devem ser explicitamente registrados com o servidor antes de a consulta ser executada. Os adaptadores de entrada e saída devem ser explicitamente registrados para que o modelo de consulta ou a consulta referencie esses objetos. Além disso, todos os objetos devem ser registrados em um aplicativo. Tipos de evento usados por adaptadores e modelos de consulta são registrados implicitamente.
Exemplos
O exemplo a seguir cria um objeto de servidor StreamInsight e um objeto de aplicativo chamado myApp no servidor. Em seguida, ele cria e registra todos os objetos necessários do StreamInsight para importar, processar e exportar os fluxos de eventos.
Primeiro, são criados o servidor e o objeto de aplicativo.
server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");
Depois, os adaptadores de entrada e saída são registrados no aplicativo.
InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");
O modelo de consulta é especificado sobre um fluxo desassociado. O único parâmetro necessário para criar um fluxo desassociado é um nome de fluxo, que mais tarde é necessário na associação de adaptador.
var inputstream = CepStream<MyDataType>.Create("filterInput");
var filtered = from e in inputstream
where e.Value > 95
select e;
QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
- A última chamada registra o modelo de consulta no aplicativo. O modelo de consulta registrado pode ser reutilizado em várias associações e, portanto, instanciado em vários consultas, cada uma associada a adaptadores de entrada e saída potencialmente diferentes. Essas associações para modelos de consulta registrados são definidas através do objeto QueryBinder:
QueryBinder queryBinder = new QueryBinder(filterQT);
queryBinder.BindProducer<MyDataType>("filterInput",
inputAdapter,
new InputAdapterConfig { someFlag = true },
EventShape.Point);
queryBinder.AddConsumer("filterOutput",
outputAdapter,
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);
O método BindProducer() associa um objeto de adaptador de entrada (deve ser registrado no aplicativo) a um fluxo com o nome especificado, no caso "filterInput". Isso permite fazer a distinção entre vários pontos de entrada de um modelo de consulta. Além do adaptador de entrada, são necessários os parâmetros específicos de associação (a configuração de adaptador e a forma de evento desejada).
O método AddConsumer() associa um objeto de adaptador de saída (que deve ser registrado no aplicativo) ao único fluxo de saída do modelo de consulta. O nome do fluxo de saída fornecido, no caso "validated", pode ser usado para identificar o fluxo para fins de diagnóstico. Assim como no adaptador de entrada, os parâmetros específicos de associação são fornecidos para o adaptador de saída.
O objeto de consulta é criado com base no binder de consulta, em um identificador de consulta e em uma descrição textual. A última etapa é iniciar a consulta.
query.Start();
Consultas com vários fluxos de entrada
O exemplo a seguir mostra como criar um modelo de consulta que usa vários fluxos de entrada. Um modelo de consulta pode ter vários pontos de entrada, cada um alimentado a partir de uma fonte de dados diferente; por exemplo, quando dois fluxos devem ser unidos. A associação de fluxo adequada ocorre através da especificação do nome do fluxo, conforme mostrado no exemplo a seguir.
CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");
// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...
InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");
qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);
Modificando um aplicativo existente
Note que você trabalha no modelo de binder de consulta com o modelo de consulta e objetos de adaptador sem precisar necessariamente criá-los no mesmo aplicativo. O exemplo a seguir supõe uma conexão com um servidor existente e recupera as entidades de metadados existentes através da API de modelo de objeto do StreamInsight em vez de criá-las.
Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];
Usando um repositório de metadados persistentes
Durante a criação de um servidor StreamInsight, um parâmetro opcional do método Server.Create() é o tipo de repositório de metadados a ser usado. Por padrão, os metadados são armazenados em memória. Outra opção é manter metadados em disco através de um banco de dados SQL Server Compact 3.5. O exemplo a seguir mostra como especificar um banco de dados SQL Server Compact 3.5 como o repositório de metadados.
SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;
server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");
Note que, ao especificar um banco de dados de metadados existente durante a criação do servidor, você ocasionará a leitura de todos os metadados do arquivo especificado. As entidades de metadados podem ser recuperadas através da API de modelo de objeto do StreamInsight.
Exemplo completo
using (Server server = Server.Create("MyInstance"))
{
try
{
Application myApp = server.CreateApplication("MyApp");
InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");
var inputstream = CepStream<MyDataType>.Create("filterInput");
var filtered = from e in inputstream
where e.Value > 95
select e;
QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
QueryBinder queryBinder = new QueryBinder(filterQT);
queryBinder.BindProducer<MyDataType>("filterInput",
inputAdapter,
new InputAdapterConfig { someFlag = true },
EventShape.Point);
queryBinder.AddConsumer("filterOutput",
outputAdapter,
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);
query.Start();
Console.ReadLine();
query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}