Export (0) Print
Expand All

Group and Apply

The examples in this topic demonstrate how to partition events into event groups by using the LINQ "group by" functionality. Aggregations and other operations can be performed on the event groups, so that each group is computed separately. The set of operations applied to each group is called apply branch. The apply branch can be provided implicitly within a single group and apply statement, or, if it contains a more complex subquery, as a separate LINQ statement. Note that apply branches are closed within the group and apply construct; for instance it is not possible to join the grouped stream with a stream from outside the grouping.

Note Note

Only default window policies are supported in StreamInsight 2.1 and later, and this is reflected in the examples in this section. In version 2.0 and earlier you can specify an output policy class, such as SnapshotWindowOutputPolicy.Clip, as shown in the following example:

// version 2.0 and earlier:
var avgCount = from v in inputStream
               group v by v.i % 4 into eachGroup
               from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
               select new { avgNumber = window.Avg(e => e.j) };

The following example groups events by the specified modulo function. It then applies a snapshot window to each group and computes the average over a payload column on each group separately. Thus, the apply branch consists of the window and the aggregation.

// Assuming the following input event type for inputStream:
public class MyPayload 
{
    public int i; 
    public float j; 
}

var avgCount = from v in inputStream
               group v by v.i % 4 into eachGroup
               from window in eachGroup.SnapshotWindow()
               select new { avgNumber = window.Avg(e => e.j) };

The previous example produced a stream with a single payload field, containing the average of the field j within each snapshot window and per group.

You can also group a projection of the original type in a "group by" clause, as shown in the following example.

var result = from e in source.AlterEventDuration(e => TimeSpan.FromMinutes(10))
                  group new { myVal = e.Value * 10 } by e.SourceId into g
                  from win in g.SnapshotWindow()
                  select new
                  {
                      avg = win.Avg(e => e.myVal)
                   };

Usually, the grouping key should be retained so that the aggregation result can be associated with the respective group. The next example shows how to retrieve the grouping key.

var avgCount = from v in inputStream
               group v by v.i % 4 into eachGroup
               from window in eachGroup.SnapshotWindow()
               select new { avgNumber = window.Avg(e => e.number), 
                   groupId = eachGroup.Key };

It is possible to group on several keys, so that each unique key combination in the input stream results in a separate group. In this case, the grouping keys must be contained in an anonymous type definition so that they can be retrieved explicitly in the final projection. Note that all grouping fields must be referenced. The following example groups events by two event payload fields and assigns a new key name to one of them.

// Assuming the following input event type for inputStream:
public class MyPayload 
{
    public int section; 
    public string category; 
    public float value; 
}

var avgCount = from v in inputStream
               group v by new { sec = v.section, v.category } into eachGroup
               from window in eachGroup.SnapshotWindow()
               select new { avgNumber = window.Avg(e => e.value), 
                   section = eachGroup.Key.sec,
                   category = eachGroup.Key.category };

The apply branch can be more complex, containing a series of operations, as shown in the example below.

// Assuming the following input event type for inputStream:
public class MyPayload 
{
    public int section; 
    public string category; 
    public float value; 
}

var result = from s in source
                group s by s.section into sg
                from e in
                    (from e in sg
                    group e by e.category into cg
                    from win in cg.TumblingWindow(TimeSpan.FromMinutes(5))
                    select new { cat = cg.Key, top = win.Max(e => e.value) })
                select new { sec = sg.Key, e.cat, e.top };

In the following example, a stream of power meter readings is assumed, containing the data of several meters. The example annotates each reading with the average over the last 10 minutes for the same meter. The query first groups the incoming data by meter ID. In each such group, the average over 10 minutes is computed and joined to the original meter events.

// Assuming the following input event type for sensorStream:
public class MeterReading
{
    public string meterId; 
    public float usage; 
}

var resultB = from s in sensorStream
              group s by s.meterId into g
              from e in
                  (from left in g
                  from right in
                      (from win in g
                          .AlterEventDuration(e => TimeSpan.FromMinutes(10))
                          .SnapshotWindow()
                      select new { avg = win.Avg(e => e.usage) })
                  select new { right.avg, left.usage })
              select new { slidingAvg = e.avg, e.usage, g.Key };

As mentioned above, the function that represents the apply branch cannot integrate any other incoming streams except applyIn.

Community Additions

ADD
Show:
© 2015 Microsoft