Export (0) Print
Expand All

StreamInsight Adapter End-to-End Example

Note Note

Input and output adapters were introduced in an earlier version of StreamInsight. Though they have been superseded by the current development model, they are still available for developers who are maintaining legacy code. For more information about the current development model, see Developer's Guide (StreamInsight). For examples that use the current development model, see StreamInsight Examples.

This topic describes the various components and steps involved in creating a StreamInsight application that uses input and output adapters, and includes an end-to-end example of an application. A StreamInsight application combines event sources, event sinks, and queries in order to implement a complex event processing scenario. The StreamInsight API offers a variety of interfaces to support various levels of control and complexity in creating and maintaining event processing applications. 

The smallest unit of an application deployment is a query, which can be started and stopped. The event source is implemented with an input adapter. The adapter feeds a stream of events into the operator tree, which represents the desired query logic, specified by the designer in the form of a query template. The processed event stream then leads into an event sink, typically an output adapter.

Developers who are not familiar with complex event processing terminology should read StreamInsight Server Concepts and StreamInsight Server Architecture.

This section steps through the typical experience of creating an end-to-end application.

Instantiate a server instance and application

The process starts with the instantiation of a StreamInsight server instance and an application.

server = Server.Create("MyInstance");
Application myApp = server.CreateApplication("MyApp");

A server must be created with an instance name that has been registered on the machine through the StreamInsight setup process (In the previous example, MyInstance). For more information, see Installation (StreamInsight).

An Application represents a scoping unit in the server that contains other metadata entities.

The previous example creates a server instance in the same process. However, another common deployment is to connect to a remote server and work on an existing application there. The following example shows how to connect to a remote server and access an existing application.

server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];

For more information about local and remote servers, see Publishing and Connecting to the StreamInsight Server.

Create an input stream

Next, an input stream is created on top of an existing adapter implementation. To be precise, the adapter factory must be specified as shown in the following example.

var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

This creates a CepStream object, representing an event stream, that is produced (once the query is started) by an adapter instantiated through the given factory class. The stream is given a name that can be used later to retrieve stream-specific diagnostics. Moreover, an instance of the configuration structure for the adapter factory is provided. The configuration structure passes runtime specific information to the factory as well as the desired event shape (event model). For more information about how the factory uses these parameters, see Creating Input and Output Adapters.

Define the query

The CepStream object is used as the basis for the definition of the actual query logic. The query uses LINQ as the query specification language:

var filtered = from e in inputstream
               where e.Value > 95
               select e;

In this example, we assume that the class or struct named MyDataType defined in the previous example to create the input stream object contains a field named Value. This definition translates to a filter operator that drops all events from the stream that do not fulfill the filter predicate where e.Value > 95. For more information about LINQ query operators, see Using StreamInsight LINQ.

Create an output adapter

At this point, the type of the variable filtered is still CepStream. This allows the stream to be turned into a query that can be started. In order to produce a query instance that can be started, an output adapter must be specified, as shown in the following example.

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

Equivalent to the input stream, the output adapter requires the specification of an output adapter factory, a configuration object, the desired output stream shape, and temporal ordering.

The event shape specification ensures the respective event shape at the query output:

  1. EventShape.Point: Any result event lifetime is reduced to a point event.

  2. EventShape.Interval: Any result event is interpreted as interval event. That is, it is only output if its full lifetime is committed by a Current Time Increment (CTI) event.

  3. EventShape.Edge: Any result event will be interpreted as edge event. That is, its start time is output as a start edge, and its end time as the corresponding end edge.

The stream event order parameter affects liveliness of interval event output streams. FullyOrdered means that interval events are always output in the order of their start times, while ChainOrdered produces an output sequence that is ordered by the interval end times.

In addition, an application object must be provided as the first parameter, which now contains the query, and a query name and description, which further identifies this query in the metadata store.

Start the query

The last step is to start the query. In this example, the query is stopped by a user-provided keystroke.

query.Start();

Console.ReadLine();

query.Stop();

This end-to-end example shows how to use an implicit binding of an event source with a query template through the CepStream.Create() and ToQuery() overloads to quickly create a working query. For more explicit control over the binding of CEP objects, see Using the Query Binder.

The following example combines the components described earlier to create a complete application.

Server server = null;

using (Server server = Server.Create("MyInstance"))
{
    try
    {
        Application myApp = server.CreateApplication("MyApp");

        var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                       typeof(MyInputAdapterFactory),
                                                       new InputAdapterConfig { someFlag = true },
                                                       EventShape.Point);

        var filtered = from e in inputstream
                       where e.Value > 95
                       select e;

        var query = filtered.ToQuery(myApp,
                                     "filterQuery",
                                     "Filter out Values over 95",
                                     typeof(MyOutputAdapterFactory),
                                     new OutputAdapterConfig { someString = "foo" },
                                     EventShape.Point,
                                     StreamEventOrder.FullyOrdered);

        query.Start();
        Console.ReadLine();
        query.Stop();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

Community Additions

ADD
Show:
© 2014 Microsoft