Compondo consultas em tempo de execução

A composição de consultas do StreamInsight em tempo de execução possibilita flexibilidade, capacidade de reutilização, uso eficiente de recursos e a facilidade de manutenção das consultas. Ela o habilita a:

  • Fornecer o resultado de uma consulta a outras consultas no mesmo servidor.

  • Consumir a saída de outras consultas em execução, da mesma forma que consome eventos de um adaptador de entrada.

Por exemplo, duas consultas redigidas, em que a consulta 1 alimenta a consulta 2, executadas isoladamente. Em caso de falha da consulta 1, o estado da consulta 2 não será afetado e vice-versa. As consultas 1 e 2 podem ser iniciadas e paradas independentemente uma da outra. Por exemplo, você pode parar a consulta 1, substituí-la por outra consulta e reiniciá-la.

Este tópico descreve vários casos de uso e exemplos de composição de consultas de maneira dinâmica em tempo de execução.

Reutilizando a saída de uma consulta existente

Um caso de uso comum para várias consultas é a necessidade de criar e implantar uma consulta primária que processe previamente os dados e envie-os a um adaptador de saída, enquanto outras consultas consomem o resultado dessa consulta e enviam seus próprios resultados a outros adaptadores de saída. Esse cenário é mostrado na ilustração a seguir.

A consulta 2 consome os dados da consulta 1.

O exemplo a seguir representa uma consulta, criada em um aplicativo myApp existente em um servidor StreamInsight.

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

Para transmitir os resultados dessa consulta em uma segunda consulta, usa-se o método Query.ToStream(). O tipo que corresponde à carga de saída da consulta primária é especificado como um parâmetro genérico, conforme mostrado no exemplo a seguir.

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

Neste exemplo, o fluxo de saída da consulta primária é acessado e um operador de projeção é aplicado para apresentar um novo campo denominado Status. A segunda chamada de ToQuery() não exige mais um objeto de aplicativo pois ela pode inferi-lo a partir da consulta primária.

O método ToStream() utiliza um objeto AdvanceTimeSettings opcional se CTIs (incrementos de tempo atual) precisarem ser injetados naquele ponto. A inserção de CTIs pode ajudar a aumentar o dinamismo de determinadas configurações de consulta.

Observe que não importa como o objeto de consulta primário foi criado. O modelo anterior mostra um exemplo de uso da API CepStream.ToQuery(). Há outras possibilidades para criar a consulta:

  • Através de um binder de consulta. Por exemplo, myApp.CreateQuery("filterQuery", queryBinder, "description");.

  • Recuperá-la através da API de modelo do objeto a partir do servidor. Por exemplo, myApp.Queries["filterQuery"].

Saída da consulta autônoma

O exemplo anterior mostra como reutilizar o resultado de uma consulta existente na qual sua saída já está associada a um adaptador de saída. Como alternativa, as consultas também podem ter um fluxo de saída autônomo de forma que nenhuma saída seja gerada, a menos que uma ou mais outras consultas consumam seu resultado. Esse cenário é mostrado na ilustração a seguir.

A consulta 1 possui um fluxo de consultas não associado.

Isso é realizado usando uma sobrecarga de CepStream.ToQuery() que não requer um adaptador:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

Essa consulta pode ser iniciada. Uma segunda consulta pode consumir seu fluxo de resultado mais tarde especificando-o conforme mostrado no exemplo anterior da consulta validationQuery. Sem um consumidor, os resultados da consulta primária são removidos.

Esse padrão também permite transmitir os resultados de uma consulta a vários adaptadores de saída. No caso mais simples, isso pode ser realizado usando consultas de passagem sobre uma consulta autônoma, uma para cada adaptador de saída (as consultas 2 e 3, na ilustração anterior).

Fluxos publicados

Até então, os exemplos usam o objeto de consulta real para criar um novo fluxo de entrada para outra consulta. Você pode usar a URI do fluxo publicado como uma entrada de uma ou mais consultas para resumir os objetos do cliente, conforme mostrado na ilustração a seguir.

Consultas usando um fluxo publicado como entrada.

Cada consulta tem uma URI de fluxo publicada padrão que é o próprio nome da consulta. Além disso, você pode atribuir explicitamente um nome de fluxo publicado personalizado à consulta, por meio do membro apropriado da classe CepStream.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

Isso cria uma consulta com uma saída autônoma, mas com nome explícito. Observe que os nomes de fluxos publicados devem seguir a convenção "<application_name>/PublishedStream/<stream_name>".

Outra consulta pode referenciar essa URI como seu fluxo de entrada, conforme mostrado no exemplo a seguir.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

Observe que o consumidor do fluxo publicado deve especificar a forma de evento de entrada, que deve corresponder à forma de saída da consulta referenciada.

A conexão a uma consulta primária por meio de um nome de fluxo publicado é menos rigorosa do que a conexão por meio do objeto de consulta. Portanto, ao definir a consulta secundária, um aplicativo deve receber:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Adaptadores de fluxo publicados

Ao recuperar adaptadores de uma consulta redigida (por exemplo, por Query.InputStreamBindings), você notará que são usados adaptadores internos especiais para conectá-los. A funcionalidade de consultas redigidas através de CepStream.ToQuery, Query.ToStream(), e assim por diante, conforme mostrado antes, são superfícies convenientes sobre esses adaptadores internos. Eles também podem ser usados explicitamente como adaptadores ordinários, com sua própria estrutura de configuração que contém o nome de fluxo publicado, conforme mostrado no seguinte exemplo:

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Do mesmo modo, uma consulta pode usar o adaptador de saída de fluxo publicado, que tem a mesma funcionalidade que CepStream.toPublishedStreamQuery():

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

Usando o binder de consulta

O modelo de desenvolvimento de binder de consulta permite o total controle dos vários objetos de metadados do StreamInsight e separa claramente a associação e o uso da consulta da fase de criação do modelo de consulta. Esse modelo também permite redigir uma consulta dinâmica, na associação de entrada e na associação de saída. Para obter mais informações, consulte Usando o binder de consulta.

Associando a outra consulta como entrada

Da mesma maneira que o binder de consulta pode associar um modelo de consulta a um adaptador de entrada como um produtor de evento, ele pode associar a uma consulta existente. Digamos que exista uma consulta primária (com saída associada ou autônoma) como no primeiro exemplo.

var query = filtered.ToQuery(myApp, ...);

Então, o binder de consulta pode ser usado conforme descrito a seguir, referenciando a consulta anterior na sobrecarga apropriada de BindProducer.

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

Como alternativa, o binder de consulta pode referenciar um fluxo publicado como um produtor de evento.

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

Assim como na assinatura Query.ToStream(), um objeto AdvanceTimeSettings opcional pode ser especificado em BindProducer().

Associando a um fluxo publicado como saída

Na saída, o binder de consulta permite o fluxo contínuo em um fluxo publicado explicitamente definido.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Assim que a consulta que se baseia nesse binder de consulta for iniciada, outras consultas poderão ser associadas a um fluxo publicado, conforme descrito nos exemplos anteriores, e poderão consumir seus eventos de resultado.

Associando a adaptadores de fluxo publicado

Adaptadores de fluxo publicado também podem ser usados no modelo de binder de consulta. Eles podem ser recuperados a partir do objeto de aplicativo e usados em BindProducer e AddConsumer como adaptadores ordinários:

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Consulte também

Conceitos

Exemplo ponta a ponta do StreamInsight

Tempo avançado do aplicativo