Export (0) Print
Expand All

StreamInsight Example: Client A - Using a Remote Server

This example demonstrates the process of creating a StreamInsight client that uses a remote server and entities defined in the server. For more information about StreamInsight entities, see StreamInsight Concepts.

This example uses the remote server and entities created in the server example in this section. To use the examples in this section together, do the following:

  1. Run the server example StreamInsight Example: Server - Exposing an Embedded Server

  2. Run one or both of the client examples:

In general, a typical StreamInsight client follows these basic steps:

  • Create a StreamInsight server instance

  • Create or get a StreamInsight application

  • Define or get a source

  • Compose a query over the source

  • Define or get a sink

  • Bind and run the query and sink

In this example, the client gets an existing application and entities from the server.

Connect to the Server

The process of creating a StreamInsight client program begins with the instantiation of a StreamInsight server instance. In this example, the client connects to a remote server called “MyStreamInsightServer”.

var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://localhost/MyStreamInsightServer"))

For more information on the options available for connecting to a StreamInsight server, see Publishing and Connecting to the StreamInsight Server.

Get the Server Application

In this example, the client uses the StreamInsight application that has been created in the remote server. All the server entities this client will use have been defined within this application.

myApp = server.Applications["serverApp"];

Get the Source

Next, get the source that has been defined on the server.

var mySource = myApp.GetObservable<int>("serverSource");

Compose a Query Over the Source

Next, compose a query over the input source. The query uses LINQ as the query specification language. In this example, the query returns the value of every even-numbered event + 1000.

var myQuery = from e in mySource
              where e % 2 == 0
              select e + 1000;

Get the Sink

Next, get the sink that has been defined on the server.

var mySink = myApp.GetObserver<int>("serverSink");

Bind and Run the Query and Sink

At this point, the query can be bound to the sink and then run in a process in the server.

var proc = myQuery.Bind(mySink).Run("serverProcess_Client_A");

This process is the only entity that this client has created in the server.

The following example combines the components described earlier to create a complete application. For simplicity, this example does not check for possible error conditions, but assumes that the server from the example, StreamInsight Example: Server - Exposing an Embedded Server, is running and that the expected entities have been created.

using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;

namespace StreamInsight21_example_ClientA

    /* This example:
     * connects to a remote server
     * gets the app, source, and sink defined in the server
     * creates a simple query over the source, binds it to the sink, and runs it
     * waits for the user to stop the program
     */
{
    class Program
    {
        static void Main(string[] args)
        {
            // Connect to the StreamInsight server
            using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://localhost/MyStreamInsightServer")))
            {
                /* The following entities are expected to be defined in the server:
                 * serverApp0
                 * serverSource0
                 * serverSink0
                 */
                /* The following entity will be defined in the server by this client:
                 * serverProcess_Client_A
                 */

                // Get the existing StreamInsight APPLICATION
                var myApp = server.Applications["serverApp"];
                
                // GET the SOURCE from the server
                var mySource = myApp.GetStreamable<long>("serverSource");

                // Compose a QUERY on the source (return every even-numbered item + 1000)
                var myQuery = from e in mySource
                              where e % 2 == 0
                              select e + 1000;

                // GET the SINK from the server
                var mySink = myApp.GetObserver<long>("serverSink");

                // BIND the QUERY to the SINK and RUN it
                using (var proc = myQuery.Bind(mySink).Run("serverProcess_Client_A"))
                {
                    // Wait for the user to stop the program
                    Console.WriteLine("----------------------------------------------------------------");
                    Console.WriteLine("Client A is running, press Enter to exit the client");
                    Console.WriteLine("----------------------------------------------------------------");
                    Console.WriteLine(" ");
                    Console.ReadLine();
                }
            }
        }
    }
}

Community Additions

ADD
Show:
© 2014 Microsoft