Compondo consultas em tempo de execução
A composição de consultas do StreamInsight em tempo de execução possibilita flexibilidade, capacidade de reutilização, uso eficiente de recursos e a facilidade de manutenção das consultas. Ela o habilita a:
Fornecer o resultado de uma consulta a outras consultas no mesmo servidor.
Consumir a saída de outras consultas em execução, da mesma forma que consome eventos de um adaptador de entrada.
Por exemplo, duas consultas redigidas, em que a consulta 1 alimenta a consulta 2, executadas isoladamente. Em caso de falha da consulta 1, o estado da consulta 2 não será afetado e vice-versa. As consultas 1 e 2 podem ser iniciadas e paradas independentemente uma da outra. Por exemplo, você pode parar a consulta 1, substituí-la por outra consulta e reiniciá-la.
Este tópico descreve vários casos de uso e exemplos de composição de consultas de maneira dinâmica em tempo de execução.
Reutilizando a saída de uma consulta existente
Um caso de uso comum para várias consultas é a necessidade de criar e implantar uma consulta primária que processe previamente os dados e envie-os a um adaptador de saída, enquanto outras consultas consomem o resultado dessa consulta e enviam seus próprios resultados a outros adaptadores de saída. Esse cenário é mostrado na ilustração a seguir.
O exemplo a seguir representa uma consulta, criada em um aplicativo myApp existente em um servidor StreamInsight.
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
var filtered = from e in inputstream
where e.Value > 95
select e;
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
query.Start();
Para transmitir os resultados dessa consulta em uma segunda consulta, usa-se o método Query.ToStream(). O tipo que corresponde à carga de saída da consulta primária é especificado como um parâmetro genérico, conforme mostrado no exemplo a seguir.
var filteredStream = query.ToStream<MyDataType>();
var validated = from e in filteredStream
select new
{
SourceId = e.SourceId,
Value = e.Value,
Status = e.Value > 75 ? false : true
};
var validationQuery = validated.ToQuery("validationQuery",
"Validating Query",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
validationQuery.Start();
Neste exemplo, o fluxo de saída da consulta primária é acessado e um operador de projeção é aplicado para apresentar um novo campo denominado Status. A segunda chamada de ToQuery() não exige mais um objeto de aplicativo pois ela pode inferi-lo a partir da consulta primária.
O método ToStream() utiliza um objeto AdvanceTimeSettings opcional se CTIs (incrementos de tempo atual) precisarem ser injetados naquele ponto. A inserção de CTIs pode ajudar a aumentar o dinamismo de determinadas configurações de consulta.
Observe que não importa como o objeto de consulta primário foi criado. O modelo anterior mostra um exemplo de uso da API CepStream.ToQuery(). Há outras possibilidades para criar a consulta:
Através de um binder de consulta. Por exemplo, myApp.CreateQuery("filterQuery", queryBinder, "description");.
Recuperá-la através da API de modelo do objeto a partir do servidor. Por exemplo, myApp.Queries["filterQuery"].
Saída da consulta autônoma
O exemplo anterior mostra como reutilizar o resultado de uma consulta existente na qual sua saída já está associada a um adaptador de saída. Como alternativa, as consultas também podem ter um fluxo de saída autônomo de forma que nenhuma saída seja gerada, a menos que uma ou mais outras consultas consumam seu resultado. Esse cenário é mostrado na ilustração a seguir.
Isso é realizado usando uma sobrecarga de CepStream.ToQuery() que não requer um adaptador:
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
EventShape.Point,
StreamEventOrder.FullyOrdered);
Essa consulta pode ser iniciada. Uma segunda consulta pode consumir seu fluxo de resultado mais tarde especificando-o conforme mostrado no exemplo anterior da consulta validationQuery. Sem um consumidor, os resultados da consulta primária são removidos.
Esse padrão também permite transmitir os resultados de uma consulta a vários adaptadores de saída. No caso mais simples, isso pode ser realizado usando consultas de passagem sobre uma consulta autônoma, uma para cada adaptador de saída (as consultas 2 e 3, na ilustração anterior).
Fluxos publicados
Até então, os exemplos usam o objeto de consulta real para criar um novo fluxo de entrada para outra consulta. Você pode usar a URI do fluxo publicado como uma entrada de uma ou mais consultas para resumir os objetos do cliente, conforme mostrado na ilustração a seguir.
Cada consulta tem uma URI de fluxo publicada padrão que é o próprio nome da consulta. Além disso, você pode atribuir explicitamente um nome de fluxo publicado personalizado à consulta, por meio do membro apropriado da classe CepStream.
var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
myApp,
"filterQuery",
"Filter out Values over 95",
EventShape.Point,
StreamEventOrder.FullyOrdered);
Isso cria uma consulta com uma saída autônoma, mas com nome explícito. Observe que os nomes de fluxos publicados devem seguir a convenção "<application_name>/PublishedStream/<stream_name>".
Outra consulta pode referenciar essa URI como seu fluxo de entrada, conforme mostrado no exemplo a seguir.
var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
EventShape.Point);
var validated = from e in filterStream
...
Observe que o consumidor do fluxo publicado deve especificar a forma de evento de entrada, que deve corresponder à forma de saída da consulta referenciada.
A conexão a uma consulta primária por meio de um nome de fluxo publicado é menos rigorosa do que a conexão por meio do objeto de consulta. Portanto, ao definir a consulta secundária, um aplicativo deve receber:
var validationQuery = validated.ToQuery(myApp,
"validationQuery",
"Validating Query",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Adaptadores de fluxo publicados
Ao recuperar adaptadores de uma consulta redigida (por exemplo, por Query.InputStreamBindings), você notará que são usados adaptadores internos especiais para conectá-los. A funcionalidade de consultas redigidas através de CepStream.ToQuery, Query.ToStream(), e assim por diante, conforme mostrado antes, são superfícies convenientes sobre esses adaptadores internos. Eles também podem ser usados explicitamente como adaptadores ordinários, com sua própria estrutura de configuração que contém o nome de fluxo publicado, conforme mostrado no seguinte exemplo:
// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
var filtered = from e in inputstream
where e.Value > 95
select e;
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
typeof(PublishedStreamAdapterFactory),
new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
EventShape.Point);
var validated = from e in filterStream
...
var validationQuery = validated.ToQuery(myApp,
"validationQuery",
"Validating Query",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Do mesmo modo, uma consulta pode usar o adaptador de saída de fluxo publicado, que tem a mesma funcionalidade que CepStream.toPublishedStreamQuery():
var filterQuery = filtered.ToQuery(myApp,
"filterQuery",
"desc",
typeof(PublishedStreamAdapterFactory),
new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Usando o binder de consulta
O modelo de desenvolvimento de binder de consulta permite o total controle dos vários objetos de metadados do StreamInsight e separa claramente a associação e o uso da consulta da fase de criação do modelo de consulta. Esse modelo também permite redigir uma consulta dinâmica, na associação de entrada e na associação de saída. Para obter mais informações, consulte Usando o binder de consulta.
Associando a outra consulta como entrada
Da mesma maneira que o binder de consulta pode associar um modelo de consulta a um adaptador de entrada como um produtor de evento, ele pode associar a uma consulta existente. Digamos que exista uma consulta primária (com saída associada ou autônoma) como no primeiro exemplo.
var query = filtered.ToQuery(myApp, ...);
Então, o binder de consulta pode ser usado conforme descrito a seguir, referenciando a consulta anterior na sobrecarga apropriada de BindProducer.
var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
select new
{
SourceId = e.SourceId,
Value = e.Value,
Status = e.Value > 75 ? false : true
};
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);
Como alternativa, o binder de consulta pode referenciar um fluxo publicado como um produtor de evento.
queryBinder.BindProducer("validationInput",
new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
EventShape.Point);
Assim como na assinatura Query.ToStream(), um objeto AdvanceTimeSettings opcional pode ser especificado em BindProducer().
Associando a um fluxo publicado como saída
Na saída, o binder de consulta permite o fluxo contínuo em um fluxo publicado explicitamente definido.
queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
EventShape.Point,
StreamEventOrder.FullyOrdered);
Assim que a consulta que se baseia nesse binder de consulta for iniciada, outras consultas poderão ser associadas a um fluxo publicado, conforme descrito nos exemplos anteriores, e poderão consumir seus eventos de resultado.
Associando a adaptadores de fluxo publicado
Adaptadores de fluxo publicado também podem ser usados no modelo de binder de consulta. Eles podem ser recuperados a partir do objeto de aplicativo e usados em BindProducer e AddConsumer como adaptadores ordinários:
queryBinder.BindProducer("validationInput",
myApp.GetPublishedStreamInputAdapter(),
new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
EventShape.Point);
queryBinder.AddConsumer("validated",
myApp.GetPublishedStreamOutputAdapter(),
new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
EventShape.Point,
StreamEventOrder.FullyOrdered);