Creating Input and Output Adapters

[This topic is pre-release documentation and is subject to change in future releases. Blank topics are included as placeholders.]

This topic discusses the interaction between input and output adapters and the complex event processing (CEP) server, and provides the design pattern for these adapters. This topic should be read together with Creating Adapters.

State Machine of the Adapter

The interaction between the adapter and the CEP server is the same for both input and output adapters. The following illustration is a state machine that represents this interaction.

Adapter enqueue and dequeue state diagram

Because the input and output adapters have the same state machine representing their interaction with the server, an adapter developer has a consistent development model to work with. The main highlights of this state machine are as follows:

  • The methods implemented by the adapter developer and called by the CEP server are Start() and Resume(). These are in addition to the constructor method and Dispose(), which is inherited from the base class.
  • The methods implemented by the CEP server and called by the adapter implementation are as follows:
    • Enqueue() for the input adapter. This returns the values EnqueueOperationResult.Success or EnqueueOperationResult.Full.
    • Dequeue() for the output adapter. This returns the values DequeueOperationResult.Success or DequeueOperationResult.Empty.
    • Ready(). This returns a Boolean value TRUE or FALSE.
    • Stopped(). This returns a Boolean value TRUE or FALSE.
  • The internal method called by the server asynchronously is StopQuery(). The StopQuery() call is a CEP Server API invocation made by an administrator or query developer to stop query execution. This is completely external to the adapter/server interaction that is described in this topic.
  • Calls to Enqueue() or Dequeue() return the status Full or Empty in the following adapter states:
    • Suspended
    • Stopping
  • Calls to Enqueue() or Dequeue() cause an exception to be raised when the adapter is in one of the following states:
    • Created
    • Stopped
  • Calls to Ready() cause an exception to be raised when the adapter is in one of the following states:
    • Created
    • Running
    • Stopped
  • An adapter transitions through some or all of the five states (Created, Running, Suspended, Stopping and Stopped) during its course of operation. A state transition occurs before the CEP server calls Start() or Resume() and after the adapter calls Enqueue(), Dequeue(), Ready(), and Stopped().
  • The CEP server and the adapter never share the same thread. The server always calls Start() or Resume() on its own worker thread. The server obtains this thread from an operating system pool on behalf of the adapter. The implication is that an adapter developer can use this thread as appropriate, but they are expected to be responsible citizens of the server. The best way to ensure this is to follow the adapter design pattern and recommendations provided for adapter development.
  • The API eliminates the need for inherent synchronization between Start() and Resume() operations (threads). The server always calls Resume() after (and only after) Ready() is called by the adapter. However, be aware that synchronization might be required for the device-facing tasks of reading, writing, or buffering events, especially in asynchronous I/O scenarios. We recommend using nonblocking I/O as a best practice.
  • If the adapter can be idle, the adapter periodically checks the state to determine whether it was asked to stop.

Life Cycle of Adapter Interaction with the Server

The life cycle of the adapter interaction with the CEP server consists of the following operations:

  • An adapter instance begins interacting with the CEP server when the server administrator or query developer implements a StartQuery() call. In the state machine illustration earlier in this topic, this interaction is represented by the Created state. The server calls Start() on the adapter asynchronously, and guarantees to make this call only one time. This call puts the adapter in the Running state. This means that it is ready to enqueue or dequeue events into or from the server.

  • The adapter developer implements a ProduceEvents() or ConsumeEvents() routine that loops through reading events from the source or writing events to the sink, making calls to Enqueue() or Dequeue() to push or pull events into or from the stream, and invoking ProduceEvents() or ConsumeEvents() from the Start() and Resume() routines. If the adapter receives an asynchronous call through StopQuery() to stop operations, the adapter is put into the Stopping state.

  • The condition Enqueue == FULL or Dequeue == EMPTY puts the adapter in a Suspended state and indicates that the server cannot produce or consume more events. In the Suspended state, the adapter can perform housekeeping operations, such as: saving the last enqueued event, calling Ready() when it is ready to resume and then return out of the routine, releasing its usage of the worker thread.

  • The server always calls Resume() asynchronously on a different worker thread in response to a Ready() call. Typically, Resume() will also call ProduceEvents() or ConsumeEvents(), the main enqueue or dequeue driver. This pattern will continue until the adapter receives an instruction to stop operations by way of a StopQuery() call.

  • A StopQuery() call causes the adapter to go into the Stopping state. In this state, Enqueue() and Dequeue() return only the status FULL or EMPTY. The Stopping state is allows for the adapter to correctly prepare itself to stop. This means relinquishing all the resources that it has obtained (threads, memory) and calling Stopped(). This puts the adapter in the Stopped state. Depending on the region of ProduceEvents() or ConsumeEvents() code in which this transition occurs, there might be one more call to Ready(). The server will call **Resume()**one more time with the adapter state still in Stopping.

  • The adapter code can call Stopped() at any point. This puts the adapter in the Stopped state. As a good design practice, we recommend that you clean up the resources that the adapter obtained before you call Stopped().

    Important

    Failure to call the Stopped() method causes the last page of memory that is associated with the query to remain allocated. This causes small memory leaks that can accumulate over time if there are many query start-and-stop cycles in a process.

  • In the Stopped state, the adapter cannot refer to any CEP server–specific constructs or event memory, or perform enqueue or dequeue operations. These will raise an exception. However, operating-system and device-facing cleanup activities can continue.

Adapter Design Pattern

The following template provides the common design pattern for an adapter.

using System;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;

namespace CepSamples.Adapters
{
    /// <summary>
    /// This is the design template for an untyped adapter.
    /// You can use this template to understand the state transition diagram
    /// described above and to quickly generate adapter code. 
    /// The example is for an input adapter. However, the same 
    /// state transition diagram applies to the output adapter,
    /// with the exceptions of using dequeue instead of enqueue, 
    /// and status checks from the server.
    /// 
    /// The design template also shows the adapter factory template
    /// that is used during
    /// query binding time to connect the query with the adapter.
    /// 
    /// The design template uses an untyped point input adapter as the reference example. The same
    /// model can be followed for different devices, and for different event shapes.
    /// </summary>
    public class SamplePointInputAdapter : PointInputAdapter
    {
        private PointEvent pendingevent;
        private DateTime pendingctitime;


        /// <summary>
        /// Constructor - Use this to initialize local resources based on configInfo
        /// structures. Examples are open files or database connections, set delimiters, and
        /// other parameters that enable the adapter to access the input/output device.
        /// </summary>
        public SamplePointInputAdapter(SampleInputConfig configInfo, CepEventType cepEventType)
        {

        }

        /// <summary>
        /// Start is the first method to be called by the CEP server after the input
        /// adapter has been instantiated. Therefore, any initializations that cannot be covered
        /// in the constructor can be placed here. Start() is called in a separate worker
        /// thread initiated by the server. In this example (and in most scenarios), Start
        /// begins producing events when it is called.
        /// </summary>
        public override void Start()
        {
            ProduceEvents();
        }

        /// <summary>
        /// Resume is called by the server when it returns from being scheduled away
        /// from Start, and only after the adapter has called Ready(). Resume continues
        /// to produce events.
        /// </summary>
        public override void Resume()
        {
            ProduceEvents();
        }

        /// <summary>
        /// Dispose is inherited from the base adapter class and is the placeholder to
        /// release adapter resources when this instance is shut down.
        /// </summary>
        /// <param name="disposing"></param>
        protected override void Dispose(bool disposing)
        {
        }

        /// <summary>
        /// Main driver to read events from the source and enqueue them.
        /// </summary>
        private void ProduceEvents()
        {
            PointEvent currevent = default(PointEvent);
            DateTime currctitime = default(DateTime);
            EnqueueOperationResult result = EnqueueOperationResult.Full;

            while (true)
            {
                // First, check if the adapter is being stopped by the server.
                if (AdapterState.Stopping == AdapterState)
                {
                    //  If yes, resolve the event from the last failed Enqueue.

                    // do housekeeping before calling stopped;

                    // declare the adapter as stopped;
                    Stopped();

                    // exit the worker thread.
                    return;
                }

                // NOTE: At this point, the adapter is in the running state.
                // At any point during execution of the code below, if the adapter state
                // changes to Stopping, the server will resume the adapter (that is, call
                // Resume())one more time, and the stopping condition will be trapped
                // at the check above.

                // If a previous enqueue failed, enqueue the pending point event or
                // the Cti event before creating the new event.

                currevent = CreateEventFromSource();
                pendingevent = null;

                // Enqueue point event with payload.
                result = Enqueue(ref currevent);

                // Handle Enqueue rejection
                if (EnqueueOperationResult.Full == result)
                {
                    // Save the pending event in PrepareToResume
                    PrepareToResume(currevent);
                    // indicate Ready to resume
                    Ready();
                    // Exit worker thread
                    return;
                }

                // Enqueue Cti event based on application logic
                result = EnqueueCtiEvent(currctitime);

                // Handle Cti rejection in a similar manner
            }
        }

        private void PrepareToStop(PointEvent currEvent)
        {
            // The server will not accept any more events, and you
            // cannot do anything about it. Release the event.
            // If you miss this step, server memory will leak.
            if (null != currEvent)
            {
                ReleaseEvent(ref currEvent);
            }
        }

        private void PrepareToResume(PointEvent currevent)
        {
            pendingevent = currevent;
        }

        private void PrepareToResume(DateTime currctitime)
        {
            pendingctitime = currctitime;
        }
        private bool EndofSource()
        {
            return false;
        }
        private PointEvent CreateEventFromSource()
        {
            // Create a point event from the source.

            return null;
        }
    }

    // Configuration structure to initialize the adapter.
    public struct SampleInputConfig
    {
        // configuration parameters
    }

    // Factory class is the entry point for the query binder to initialize
    // and create an adapter instance.
    public class SampleInputFactory : IInputAdapterFactory<SampleInputConfig>
    {
        public SampleInputFactory()
        {
        }

        public InputAdapterBase Create(SampleInputConfig configInfo,
            EventShape eventshape, CepEventType cepeventtype)
        {
            InputAdapterBase adapter = default(InputAdapterBase);

            if (EventShape.Point == eventshape)
                adapter = new SamplePointInputAdapter(configInfo, cepeventtype);
            /*
            else if (EventShape.Interval == eventshape)
                adapter = new SampleIntervalInputAdapter(configInfo, cepeventtype);
            else if (EventShape.Edge == eventshape)
                adapter = new SampleEdgeInputAdapter(configInfo, cepeventtype);
             */
            return adapter;
        }
        public void Dispose()
        {
        }
    }
}

See Also

Concepts

Creating Adapters
Samples (StreamInsight)