Exemplo ponta a ponta do StreamInsight

Este tópico descreve os vários componentes e etapas envolvidos na criação de um aplicativo do StreamInsight e inclui um exemplo de ponta a ponta de um aplicativo. Um aplicativo StreamInsight combina fontes de evento, coletores de eventos e consultas para implementar um cenário complexo de processamento de eventos. A API do StreamInsight oferece uma variedade de interfaces por dar suporte a vários níveis de controle e complexidade na criação e manutenção de aplicativos de processamento de eventos. 

A menor unidade de uma implantação de aplicativo é uma consulta, a qual pode ser iniciada e parada. A ilustração a seguir mostra uma forma de compilar uma consulta. A fonte do evento é representada por um adaptador de entrada. O adaptador alimenta um fluxo de eventos na árvore de operador, que representa a lógica de consulta desejada, especificada pelo designer no formulário de um modelo de consulta. O fluxo de eventos processados leva a um coletor de eventos, que costuma ser um adaptador de saída.

Consulta com adaptadores de entrada e saída

Os desenvolvedores não familiarizados com a terminologia de processamento de eventos complexos devem ler Conceitos do servidor StreamInsight e Arquitetura do servidor StreamInsight.

Processo de aplicativo

Esta seção percorre a experiência típica de criar um aplicativo ponta a ponta.

Crie uma instância de servidor e um aplicativo

O processo inicia com a instanciação de uma instância de servidor e de um aplicativo do StreamInsight.

server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Um servidor deve ser criado com um nome de instância que foi registrado na máquina através do processo de instalação do StreamInsight (No exemplo anterior, MyInstance). Para obter mais informações, consulte Instalação (StreamInsight).

Um Aplicativo representa uma unidade de escopo no servidor que contém outras entidades de metadados.

O exemplo anterior cria uma instância de servidor no mesmo processo. Porém, outra implantação comum é conectar-se a um servidor remoto e trabalhar em um aplicativo existente nesse local. O exemplo a seguir mostra como conectar-se a um servidor remoto e acessar um aplicativo existente.

server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];

Para obter mais informações sobre os servidores local e remoto, consulte Publicando e conectando ao servidor StreamInsight.

Crie um fluxo de entrada

Em seguida, um fluxo de entrada é criado sobre uma implementação de adaptador existente. Mais precisamente, a fábrica do adaptador precisa ser especificada conforme mostrado no exemplo a seguir.

var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

Isso cria um objeto CepStream, representando um fluxo de eventos, que é gerado (quando a consulta é iniciada) por um adaptador instanciado através da classe de fábrica especificada. O fluxo recebe um nome que pode ser usado mais adiante para recuperar o diagnóstico específico de fluxo. Além disso, é fornecida uma instância da estrutura de configuração para a fábrica do adaptador. A estrutura de configuração passa informações específicas do tempo de execução para a fábrica e também a forma de evento desejada (modelo de evento). Para obter mais informações sobre como a fábrica usa esses parâmetros, consulte Criando adaptadores de entrada e saída.

Defina a consulta

O objeto CepStream é usado como a base para a definição da lógica de consulta real. A consulta usa LINQ como a linguagem de especificação da consulta:

var filtered = from e in inputstream
               where e.Value > 95
               select e;

Neste exemplo, supomos que a classe ou a estrutura chamada MyDataType definida no exemplo anterior para criar o objeto de fluxo de entrada contém um campo chamado Value. Essa definição é convertida em um operador de filtro que descarta todos os eventos do fluxo que não atendem ao predicado de filtro where e.Value > 95. Para obter mais informações sobre operadores da consulta LINQ, consulte Gravando modelos de consulta no LINQ.

Crie um adaptador de saída

Neste momento, o tipo da variável filtered permanece CepStream. Isso permite ao fluxo se tornar uma consulta que pode ser iniciada. Para gerar uma instância de consulta que possa ser iniciada, um adaptador de saída deve ser especificado, conforme mostrado no exemplo a seguir.

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

Equivalente ao fluxo de entrada, o adaptador de saída requer a especificação de: uma fábrica do adaptador de saída, um objeto de configuração, a forma de fluxo de saída desejada e a ordenação temporal.

A especificação da forma de evento garante a forma do respectivo evento na saída da consulta:

  1. EventShape.Point: qualquer tempo de vida do evento de resultado é reduzido a um evento de ponto.

  2. EventShape.Interval: qualquer evento de resultado é interpretado como um evento de intervalo. Ou seja, ele só será apresentado se seu tempo de vida total for confirmado por um evento CTI (incremento de hora atual).

  3. EventShape.Edge: qualquer evento de resultado será interpretado como um evento de borda. Ou seja, a hora de início é apresentada como uma borda de início e, a hora de término, como a borda de término correspondente.

O parâmetro da ordem de evento de fluxo afeta o dinamismo de fluxos de saída de evento de intervalo. FullyOrdered significa que eventos de intervalo são sempre apresentados na ordem das suas horas de início, enquanto ChainOrdered gera uma sequência de saída que é ordenada pelas horas de término do intervalo.

Além disso, um objeto de aplicativo deve ser fornecido como o primeiro parâmetro, que agora contém a consulta, e um nome de consulta e descrição, que identificam melhor essa consulta no repositório de metadados.

Inicie a consulta

A última etapa é iniciar a consulta. Neste exemplo, a consulta é parada por um pressionamento de tecla fornecido pelo usuário.

query.Start();

Console.ReadLine();

query.Stop();

Este exemplo ponta a ponta mostra como usar uma associação implícita de uma fonte de evento com um modelo de consulta através das sobrecargas CepStream.Create() e ToQuery() para criar rapidamente uma consulta de trabalho. Para obter o controle mais explícito da associação de objetos CEP, consulte Usando o binder de consulta.

Exemplo completo

O exemplo a seguir combina os componentes descritos anteriormente para criar um aplicativo completo.

Server server = null;

using (Server server = Server.Create(”MyInstance”))
{
    try
    {
        Application myApp = server.CreateApplication("MyApp");

        var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                       typeof(MyInputAdapterFactory),
                                                       new InputAdapterConfig { someFlag = true },
                                                       EventShape.Point);

        var filtered = from e in inputstream
                       where e.Value > 95
                       select e;

        var query = filtered.ToQuery(myApp,
                                     "filterQuery",
                                     "Filter out Values over 95",
                                     typeof(MyOutputAdapterFactory),
                                     new OutputAdapterConfig { someString = "foo" },
                                     EventShape.Point,
                                     StreamEventOrder.FullyOrdered);

        query.Start();
        Console.ReadLine();
        query.Stop();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

Consulte também

Conceitos

Usando o binder de consulta