Aggregations

 

The examples in this topic show how to perform aggregation operations over event windows. The supported aggregation functions are as follows:

  • Average over a numeric payload field (avg).

  • Summation over a payload field (sum).

  • Minimum over a payload field (min).

  • Maximum over a payload field (max).

  • Count of events (count).

Aggregations are set-based operations. That is, they perform some computation over subsets of data. These subsets are specified as event windows, grouping events along the timeline. By this definition, aggregations can be applied only to windows and not to ordinary event streams. They are provided as extension methods on IQWindowedStreamable<T>. For more information about windows, see Using Event Windows.

Note

For CEPStream<T> streams, aggregations are extensions to CepWindowStream<T>. For more information on developing with input and output adapters, see Input and Output Adapters (Legacy Model). The discussion in this topic applies to both types of streams.

Using the same syntax as a projection operation, the results of the aggregation operations become scalar-value payload fields. Aggregates over windows are computed incrementally. That is, previous aggregation results are updated when an event enters or leaves the aggregation window. Be aware that this can affect number precision.

avg, sum, min, and max all take one input parameter, representing the value to aggregate. Usually, this is a reference to an event field. The input parameter is given as a lambda expression, as shown in the examples below. The count aggregate counts entire events in the window and hence does not have a parameter.

The handling of null values is consistent with similar functions in Transact-SQL:

  • In sum, null is treated as 0 (zero).

  • In min, any value is smaller than null.

  • In max, any value is larger than null.

  • In TopK, when events are ranked according to a payload field, null is always smaller than any value.

Minimum and maximum functions can be used on any type that implements a comparer, such as numeric, lexicographic (string), or temporal (datetime) types.

Examples

The following examples show how to apply aggregate functions to events defined in Hopping Windows and Snapshot Windows. Note that in this release, Count Windows cannot be used with built-in aggregates or TopK.

A. Aggregation over a hopping window

A hopping window repeats itself over time with a regular period and frequency. For example, aggregating event data within one hour, recalculating every five minutes corresponds to a hopping window with a window size of 1h and a hop size of 5min as shown in the following example.

// Assuming the following input event type for inputStream:
public class MyPayload 
{ 
    public int i; 
    public float f; 
}
var avgHourly = from win in inputStream.HoppingWindow(TimeSpan.FromHours(1), TimeSpan.FromMinutes(5))
                  select new { hourlyavg = win.Avg(e => e.f) }; 

The window operator is applied to the input stream, yielding an IQWindowedStreamable<T>. Each element win in this stream represents a window that contains events.

The following example counts events within each full hour. It uses a tumbling window. This is a shortcut for a hopping window whose hop size is equal to the window size.

var countHourly = from win in hourStream.TumblingWindow(TimeSpan.FromHours(1))
                  select new { count = win.Count() };

B. Aggregation over a snapshot window

In the following example, the from clause applies a snapshot window on the stream inputStream. In the example, the Sum aggregation results are assigned to the payload field e.i and the Avg aggregation results are computed based on the payload field e.f. Additionally, the count aggregate is used. The example also shows how to combine several aggregations in the same statement. They are all computed with respect to the same window.

// Assuming the following input event type for inputStream:
public class MyPayload 
{ 
    public int i; 
    public float f; 
}

// Multiple aggregations over a snapshot window
var snapshotAgg = from w in inputStream.Snapshot()
                  select new { sum = w.Sum(e => e.i),
                               avg = w.Avg(e => e.f),
                               count = w.Count() };

You can embed aggregates in complex expressions, and an expression can contain more than one aggregate, as shown in the following example.

var result = from w in inputStream.Snapshot()
             select new { ratio = w.Sum(e => e.i) / w.Sum(e => e.f) };

See Also

Time Stamp Modifications
Using Event Windows
Hopping Windows
Count Windows
Snapshot Windows