using System;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;
namespace CepSamples.Adapters
{
/// <summary>
/// This is the design template for an untyped adapter.
/// You can use this template to understand the state transition diagram
/// described above and to quickly generate adapter code.
/// The example is for an input adapter. However, the same
/// state transition diagram applies to the output adapter,
/// with the exceptions of using dequeue instead of enqueue,
/// and status checks from the server.
///
/// The design template also shows the adapter factory template
/// that is used during
/// query binding time to connect the query with the adapter.
///
/// The design template uses an untyped point input adapter as the reference example. The same
/// model can be followed for different devices, and for different event shapes.
/// </summary>
public class SamplePointInputAdapter : PointInputAdapter
{
private PointEvent pendingevent;
private DateTime pendingctitime;
/// <summary>
/// Constructor - Use this to initialize local resources based on configInfo
/// structures. Examples are open files or database connections, set delimiters, and
/// other parameters that enable the adapter to access the input/output device.
/// </summary>
public SamplePointInputAdapter(SampleInputConfig configInfo, CepEventType cepEventType)
{
}
/// <summary>
/// Start is the first method to be called by the CEP server after the input
/// adapter has been instantiated. Therefore, any initializations that cannot be covered
/// in the constructor can be placed here. Start() is called in a separate worker
/// thread initiated by the server. In this example (and in most scenarios), Start
/// begins producing events when it is called.
/// </summary>
public override void Start()
{
ProduceEvents();
}
/// <summary>
/// Resume is called by the server when it returns from being scheduled away
/// from Start, and only after the adapter has called Ready(). Resume continues
/// to produce events.
/// </summary>
public override void Resume()
{
ProduceEvents();
}
/// <summary>
/// Dispose is inherited from the base adapter class and is the placeholder to
/// release adapter resources when this instance is shut down.
/// </summary>
/// <param name="disposing"></param>
protected override void Dispose(bool disposing)
{
}
/// <summary>
/// Main driver to read events from the source and enqueue them.
/// </summary>
private void ProduceEvents()
{
PointEvent currevent = default(PointEvent);
DateTime currctitime = default(DateTime);
EnqueueOperationResult result = EnqueueOperationResult.Full;
while (true)
{
// First, check if the adapter is being stopped by the server.
if (AdapterState.Stopping == AdapterState)
{
// If yes, resolve the event from the last failed Enqueue.
// do housekeeping before calling stopped;
// declare the adapter as stopped;
Stopped();
// exit the worker thread.
return;
}
// NOTE: At this point, the adapter is in the running state.
// At any point during execution of the code below, if the adapter state
// changes to Stopping, the server will resume the adapter (that is, call
// Resume())one more time, and the stopping condition will be trapped
// at the check above.
// If a previous enqueue failed, enqueue the pending point event or
// the Cti event before creating the new event.
currevent = CreateEventFromSource();
pendingevent = null;
// Enqueue point event with payload.
result = Enqueue(ref currevent);
// Handle Enqueue rejection
if (EnqueueOperationResult.Full == result)
{
// Save the pending event in PrepareToResume
PrepareToResume(currevent);
// indicate Ready to resume
Ready();
// Exit worker thread
return;
}
// Enqueue Cti event based on application logic
result = EnqueueCtiEvent(currctitime);
// Handle Cti rejection in a similar manner
}
}
private void PrepareToStop(PointEvent currEvent)
{
// The server will not accept any more events, and you
// cannot do anything about it. Release the event.
// If you miss this step, server memory will leak.
if (null != currEvent)
{
ReleaseEvent(ref currEvent);
}
}
private void PrepareToResume(PointEvent currevent)
{
pendingevent = currevent;
}
private void PrepareToResume(DateTime currctitime)
{
pendingctitime = currctitime;
}
private bool EndofSource()
{
return false;
}
private PointEvent CreateEventFromSource()
{
// Create a point event from the source.
return null;
}
}
// Configuration structure to initialize the adapter.
public struct SampleInputConfig
{
// configuration parameters
}
// Factory class is the entry point for the query binder to initialize
// and create an adapter instance.
public class SampleInputFactory : IInputAdapterFactory<SampleInputConfig>
{
public SampleInputFactory()
{
}
public InputAdapterBase Create(SampleInputConfig configInfo,
EventShape eventshape, CepEventType cepeventtype)
{
InputAdapterBase adapter = default(InputAdapterBase);
if (EventShape.Point == eventshape)
adapter = new SamplePointInputAdapter(configInfo, cepeventtype);
/*
else if (EventShape.Interval == eventshape)
adapter = new SampleIntervalInputAdapter(configInfo, cepeventtype);
else if (EventShape.Edge == eventshape)
adapter = new SampleEdgeInputAdapter(configInfo, cepeventtype);
*/
return adapter;
}
public void Dispose()
{
}
}
}