Share via


IObservable/IObserver Development Model

[This topic is pre-release documentation and is subject to change in future releases. Blank topics are included as placeholders.]

The IObservable/IObserver development model provides an alternative to using input and output adapters as the producer and consumer of event sources and sinks. This model is based on the IObservable/IObserver design pattern in which an observer is any object that wishes to be notified when the state of another object changes, and an observable is any object whose state may be of interest, and in whom another object may register an interest. For example, in a publication-subscription application, the observable is the publisher, and the observer is the subscriber object. For more information, see Exploring the Observer Design Pattern on MSDN.

In a complex event processing (CEP) application, the observable is the event source. The CEP query acts as an observer of this source and presents its result as an observable to the event sink, which is also an observer.

Typical use cases for the IObservable/IObserver development model include:

  • You want to tie the CEP event production and consumption tighter into the CLR model of querying over enumerations of events by using LINQ.
  • You want your .NET applications to continue working on data elements without a major learning or programming investment to define the temporal properties of the events. At the same time, if the application is actually event-driven, a CEP application developer can view these data elements as events and process them using the CEP server for real-time analysis and insights.

Key Characteristics of the IObservable/IObserver Development Model

The IObservable/IObserver development model has the following key characteristics:

  • The tasks to enable this model are minimal. The application developer implements an IObservable class for the data/event producer, and an IObserver class for the data/event consumer.
  • The CEP server administrator or query developer configures the server to accept connections from the Observable and Observer objects, and binds these objects to queries.

Example

In the following example, event sources and sinks interact with the CEP server through Observable and Observer interface implementations. The example assumes the existence of an Observable as well as an Observer object. The CEP server is implicit and is not created by the developer. For more details about creating Observer/Observable data sources and sinks as well as how to integrate them with CEP query logic, see Using Observables as Event Sources.

void ObserverUseCase(IObservable<PayloadType> myObservableSource, 
                     IObserver<ResultType> myObserver)
{
    // Create an input stream from the observable data source.
    CepStream<PayloadType> inputStream = myObservableSource.ToCepStream<PayloadType>();

    // Define the query logic.
    CepStream<PayloadType> filter = from e in inputStream
                                    where e.value > 10
                                    select e;

    // Create a query object as IObservable, CEP server is implicit
    var myObservableQuery = filter.ToObservable();

    // Add the observer as the event consumer and start the query.
    IDisposable subscription = myObservableQuery.Subscribe(myObserver);
}