Export (0) Print
Expand All

Time Stamp Modifications

The examples in this topic demonstrate using operators to modify the timestamp of an event. By changing the event timestamp, you can change the effect of events on subsequent operations such as joins, aggregations over windows, and so forth. The following LINQ extension methods surface this functionality.

The AlterEventStartTime() operator changes the start time of each event in the stream according to the specified expression.

The following example changes the start time of each event in the stream by 15 minutes into the future.

// change events by 15 minutes into the future
var altered = inputStream.AlterEventStartTime(e => TimeSpan.FromMinutes(15)); 

The following example changes the start time of each event in the stream by 1 hour into the past.

// change events by 1 hour into the past.
var altered = inputStream.AlterEventStartTime(e => TimeSpan.FromHours(-1));

The expression for specifying the time alteration can refer to the current event's start time, but not to its end time or payload. The change does not impact the lifetime or payload of the event.

The value DateTime.MinValue is considered to encode a time value of minus infinity. If the event start time has this value and is referred to in the specified expression (as opposed to a constant), the expression is not evaluated and the start time will remain DateTime.MinValue. If this is not the case, then the expression will be evaluated at runtime, which could still result in an overflow exception.

Note that the specified time change is also applied to CTI events passing through this operator because AlterEventStartTime affects the start times of all events in the stream.

NoteNote

In StreamInsight 2.0 and earlier, use ShiftEventTime() for CepStream streams. For example, the following statement shifts the time of each event in the stream by 15 minutes into the future.

// for CepStream: shift events by 15 minutes into the future
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromMinutes(15));

The AlterEventDuration() operator changes the lifetime of the event. The event lifetime specifies the time interval during which the event is valid. The duration is defined as a function on the event so that it can be computed from the event’s start time, end time, or payload.

The following example sets the event duration to 1 hour.

// set event duration to 1 hour.
var onehour = inputStream.AlterEventDuration(e => TimeSpan.FromHours(1));

The following example sets the event duration to twice its current lifetime.

// double event duration. 
var doubled = inputStream.AlterEventDuration(e => (e.EndTime - e.StartTime) * 2);

The value DateTime.MaxValue is considered to encode a time value of plus infinity. If the event end time has this value and is referred to in the specified expression, the expression is not evaluated and the end time will remain DateTime.MaxValue.

The AlterEventLifetime() operator combines both the AlterEventDuration and AlterEventStartTime functions for maximum expressivity.

The first parameter to the AlterEventLifeTime() method specifies the new start timestamp and can refer to the start time of the current event. This parameter must be specified as a UTC time. The second parameter specifies the new lifetime duration and can refer to the start time, end time, and payload fields of the current event.

The following example shifts the event time 1 minute into the past, but leaves the end time of the event unchanged (by adding an additional minute to the original lifetime) when specifying the new lifetime as the second parameter.

// shift event 1 minute into the past, but leave the end time (event duration) unchanged.
var newStream = inputStream.AlterEventLifetime(e => e.StartTime - TimeSpan.FromMinutes(1),
                                               e => e.EndTime - e.StartTime + TimeSpan.FromMinutes(1));]

Note that the specified start time change is also applied to CTI events passing through this operator.

See also the remarks regarding DateTime.MinValue and DateTime.MaxValue earlier in this topic.

The ToPointEventStream operator is a convenience function for converting edge and interval events into point events (by altering the events' lifetimes to one single tick past the event's start time) as shown in the following example.

var pointStream = inputStream.ToPointEventStream();

Only the start time of the events is retained when interval events are converted to point events.

The ClipEventDuration operator takes two streams as parameters and changes the lifetime of each event in the first stream according to the start time of the next matching event in second stream.

So far, we have seen operators that allow you to change the lifetime of an event by a fixed timespan. The ClipEventDuration operator provides a very flexible method of adjusting the lifetime of events with respect to other events. In general, this operator is specified on one stream and takes another stream as a parameter, along with a matching condition. The operator will clip the lifetime of each event in the first stream to the start time of the "next" event (in terms of application time) in the other stream that fulfills the matching condition.

As an example, assume two streams, stream1 and stream2, carrying events with a payload field "Id". The following statement clips all events in stream1 to the next event in stream2 that has the same value for "Id":

var clipped = stream1.ClipEventDuration(stream2, (e1, e2) => e1.Id == e2.Id);

The matching condition is given as an expression over both input payloads. The semantics of this statement is illustrated in the following diagram:

Semantics of ClipEventDuration

The diagram shows how the first event in stream1 with Id = A is clipped to the next event with Id = A in stream2. The other event in stream1, where Id = B, is not clipped, since the next matching event in stream2 occurs only after the end of the event in stream1.

This clipping behavior opens a wide range of applications. One common requirement that it can fulfill is the conversion of a stream of points into a stream of continuous intervals, also called a "signal".

Point to Signal Conversion

In this case, you first need to extend all the point events, so that they actually reach up to the next event. In other words, you need to apply a timeout that determines how long an event is supposed to last until the next event occurs. This timeout can be a finite or infinite time span. Let us assume a timeout of 60 seconds:

var extended = input.AlterEventDuration(e => TimeSpan.FromSeconds(60));

With this preparation, we can use the ClipEventDuration operator, providing the stream itself as its parameter. This will cause each event to be clipped to the next one in the same stream, creating a continuous series of interval events. Since only the start times of the second stream matter for the clipping operation, we can as well use the original point stream:

var signal = extended.ClipEventDuration(input, (e1, e2) => true);

Here, the matching condition always evaluates to true, assuming that we are looking at a single logical stream, i.e., all events in the stream are associated with a single data source.

The following diagrams illustrate the effect of the point-to-signal conversion through the ClipEventDuration operator:

Point to signal conversion with ClipEventDuration

Both LINQ statements can be combined into a single statement:

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => true);

If the stream contains multiple logical streams - for instance, measurements from multiple devices, or the values of multiple stocks - then the respective key (device Id or stock symbol) would have to be matched in the Boolean expression:

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => e1.Symbol == e2.Symbol);

Creating Sessions

Another use case for ClipEventDuration is the creation of session events in order to annotate events that occurred during such a session. Let us assume the following event schema, describing events of some user interaction:

public class EventType
{
    public int UserId;
    public string Type;
    public DateTime Time;
    public byte[] data;
};

In this example, the payload field Type can be either “start”, “end”, or “other”, describing the start of a user session, the end of a session, or user events during a session, respectively. The field Time contains the timestamp of the interaction, and data contains further information. The task is to annotate each event with the start time of the session during which the event occurred. Moreover, we assume that each session times out after 10 minutes.

The following diagram shows a series of example events in this scenario:

Creating session events with ClipEventDuration

First, the duration expansion according to the timeout is applied to all events of type “start”:

var sessionStarts = from e in input
                    where e.Type == “start”
                    select e;
var sessionStartsExt = sessionStarts.AlterEventDuration(e => TimeSpan.FromMinutes(10));

Next, these session events need to be clipped until their respective end, for each user Id:

var sessionEnds = from e in input
                  where e.Type == “end”
                  select e;
var sessions = sessionStartsExt.ClipEventDuration(sessionEnds, (e1, e2) => e1.UserId == e2.UserId);

The diagram illustrates these statements:

Clipping session events with ClipEventDuration

Now the session events can be joined to the remaining events:

var sessionActivity = from e in input
                      where e.Type == “other”
                      select e;
var annotated = from s1 in sessions
                join s2 in sessionActivity
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

In the join, we are able to refer to the sessionActivity events as well as the fields of the session event, so that we can assemble the annotated sessionActivity event, pulling in the session start time to each sessionActivity event:

Joining session events to other events

Since the join condition is the equality of UserId, the event with UserId=Y in sessionActivity is not taken into account for this specific session where UserId=X.

The LINQ statements can be compressed into a more concise set:

var sessions = input
                 .Where(e => e.Type == “start”)
                 .AlterEventDuration(e => TimeSpan.FromMinutes(10))
                 .ClipEventDuration(input.Where(e => e.Type == “end”), (e1, e2) => e1.UserId == e2.UserId);
var annotated = from s1 in sessions
                join s2 in input.Where(e => e.Type == “other”)
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

Community Additions

ADD
Show:
© 2014 Microsoft