February 2013

Volume 28 Number 02

StreamInsight - Taming the Event Stream: Fast Approximate Counting

By Michael Meijer | February 2013

So you have a voluminous and potentially infinite stream of events such as a clickstream, sensor data, credit-card transaction data or Internet traffic. It’s infeasible to store all events or analyze them in multiple passes. Why not resort to a window of recent events to simplify analysis?

Suppose you want to count the number of interesting events in a large window covering the latest N events of the stream. A naïve approach to counting requires all N events to be in memory and a full iteration over them. As the window slides upon the arrival of a new event, its oldest event expires and the new event is inserted. Counting over the new window from scratch wastes the processing time spent on N-2 events shared. Yuck! This article explains a data structure to reduce memory space usage and processing time to a small fraction of what would be required with that method, while supporting an event rate exceeding many thousands of events per second on commodity hardware. This article also shows how to embed the data structure in a user-defined stream operator in C# for the Microsoft streaming data processor, StreamInsight 2.1. Intermediate programming skills are required to follow along, and some experience with StreamInsight can come in handy.

A Tale of Counting

Before diving into StreamInsight, I’ll investigate the seemingly trivial problem of counting. For simplicity, assume the stream has events with payloads of 0 or 1—uninteresting and interesting events, respectively (regardless of what constitutes “interesting” in your specific scenario). The number of 1s is counted over a (fixed-size) count-based window containing the most recent N events. Naïve counting takes O(N) time and space. 

As an astute reader, you probably came up with the idea of maintaining the count between consecutive windows and incrementing it for new 1s and decrementing it for expired 1s, sharing the N-2 events already processed. Good thinking! Maintaining the count now takes O(1) time. However, should you decrement for an expired event or not? Unless you know the actual event, the count can’t be maintained. Unfortunately, to know the events until they have expired requires the entire window in memory—that is, it takes O(N) space. Another strategy might be to filter out the uninteresting events and count only the remaining interesting events. But that doesn’t reduce computational complexity and leaves you with a variable-size window.

Can the memory beast be tamed? Yes, it can! However, it requires a compromise between processing time and memory space at the expense of accuracy. The seminal paper by Mayur Datar, Aristides Gionis, Piotr Indyk and Rajeev Motwani titled “Maintaining Stream Statistics over Sliding Windows” (stanford.io/SRjWT0) describes a data structure called the exponential histogram. It maintains an approximate count over the last N events with a bounded relative error ε. This means that at all times:

Conceptually, the histogram stores events in buckets. Every bucket initially covers one event, so it has a count of 1 and a timestamp of the event it covers. When an event arrives, expired buckets (covering expired events) are removed. A bucket is created only for an interesting event. As buckets are created over time, they’re merged to save memory. Buckets are merged so they have exponentially growing counts from the most recent to the last bucket, that is, 1, 1, ..., 2, 2, ..., 4, 4, ..., 8, 8 and so on. This way, the number of buckets is logarithmic in the window size N. More precisely, it requires O(1⁄ε  log N) time and space for maintenance. All but the last bucket cover only non-expired events. The last bucket covers at least one non-expired event. Its count must be estimated, which causes the error in approximating the overall count. Hence, the last bucket must be kept small enough to respect the relative error upper bound.

In the next section, the implementation of the exponential histogram in C# is discussed with a bare minimum of math. Read the aforementioned paper for the intricate details. I’ll explain the code and follow up with a pen-and-paper example. The histogram is a building block for the StreamInsight user-defined stream operator developed later in this article.

To Bucket or Not to Bucket

Here’s the bucket class:

[DataContract]
public class Bucket
{
  [DataMember]
  private long timestamp;
  [DataMember]
  private long count;
  public long Timestamp {
    get { return timestamp; }
    set { timestamp = value; } }
  public long Count { get { return count; } set { count = value; } }
}

It has a count of the (interesting) events it covers and a timestamp of the most recent event it covers. Only the last bucket can cover expired events, as mentioned, but it must cover at least one non-expired event. Hence, all but the last bucket counts are exact. The last bucket count must be estimated by the histogram. Buckets containing only expired events are themselves expired and can be removed from the histogram.

Using just two operations, the exponential histogram ensures a relative error upper bound ε on the count of interesting events over the N most recent events. One operation is for updating the histogram with new and expired events, maintaining the buckets. The other is for querying the approximate count from the buckets. The histogram class outline is shown in Figure 1. Next to the linked list of buckets, its key variables are the window size (n), the relative error upper bound (epsilon) and the cached sum of all bucket counts (total). In the constructor, the given window size, the given relative error upper bound and an initial empty list of buckets are set.

Figure 1 The Exponential Histogram Class Outline

[DataContract]
public class ExponentialHistogram
{
  [DataMember]
  private long n;
  [DataMember]
  private double epsilon;
  [DataMember]
  private long total;
  [DataMember]
  private LinkedList<Bucket> buckets;
  public ExponentialHistogram(long n, double epsilon)
  {
    this.n = n;
    this.epsilon = epsilon;
    this.buckets = new LinkedList<Bucket>();
  }
  public void Update(long timestamp, bool e) { ... }
  protected void ExpireBuckets(long timestamp) { ... }
  protected void PrependNewBucket(long timestamp) { ... }
  protected void MergeBuckets() { ... }
  public long Query() { ... }
}

The maintenance of the histogram is performed by this update method:

public void Update(long timestamp, bool eventPayload)
{
  RemoveExpiredBuckets(timestamp);
  // No new bucket required; done processing
  if (!eventPayload)
    return;
  PrependNewBucket(timestamp);
  MergeBuckets();
}

It accepts a discrete timestamp, as opposed to wall-clock time, to determine what the latest N events are. This is used to find and remove expired buckets. If the new event has a payload of 0 (false), processing stops. When the new event has a payload of 1 (true), a new bucket is created and prepended to the list of buckets. The real fireworks are in merging the buckets. The methods called by the update method are discussed in sequence.

Here’s the code for the removal of buckets:

protected void RemoveExpiredBuckets(long timestamp)
{
  LinkedListNode<Bucket> node = buckets.Last;
  // A bucket expires if its timestamp
  // is before or at the current timestamp - n
  while (node != null && node.Value.Timestamp <= timestamp - n)
  {
    total -= node.Value.Count;
    buckets.RemoveLast();
    node = buckets.Last;
  }
}

The traversal starts from the oldest (last) bucket and ends at the first non-expired bucket. Each bucket whose most recent event’s timestamp is expired—that is, whose timestamp is no greater than the current timestamp minus the window size—is removed from the list. This is where the discrete timestamp comes in. The sum of all bucket counts (total) is decremented by the count of each expired bucket.

After expired events and buckets are accounted for, the new event is processed:

protected void PrependNewBucket(long timestamp)
{
  Bucket newBucket = new Bucket()
  {
    Timestamp = timestamp,
    Count = 1
  };
  buckets.AddFirst(newBucket);
  total++;
}

A new bucket for the event with a payload of 1 (true) is created with a count of 1 and a timestamp equal to the current timestamp. The new bucket is prepended to the list of buckets and the sum of all bucket counts (total) is incremented.

The memory space-saving and error-bounding magic is in the merging of buckets. The code is listed in Figure 2. Buckets are merged so that consecutive buckets have exponentially growing counts, that is, 1, 1, ..., 2, 2, ..., 4, 4, ..., 8, 8 and so on. The number of buckets with the same count is determined by the choice of the relative error upper bound ε. The total number of buckets grows logarithmically with the size of the window n, which explains the memory space savings. As many buckets as possible are merged, but the last bucket’s count is kept small enough (compared to the sum of the other bucket counts) to ensure the relative error is bounded.

Figure 2 Merging Buckets in the Histogram

protected void MergeBuckets()
{
  LinkedListNode<Bucket> current = buckets.First;
  LinkedListNode<Bucket> previous = null;
  int k = (int)Math.Ceiling(1 / epsilon);
  int kDiv2Add2 = (int)(Math.Ceiling(0.5 * k) + 2);
  int numberOfSameCount = 0;
  // Traverse buckets from first to last, hence in order of
  // descending timestamp and ascending count
  while (current != null)
  {
    if (previous != null && previous.Value.Count == current.Value.Count)
      numberOfSameCount++;
    else
      numberOfSameCount = 1;
    // Found k/2+2 buckets of the same count?
    if (numberOfSameCount == kDiv2Add2)
    {
      // Merge oldest (current and previous) into current
      current.Value.Timestamp = previous.Value.Timestamp;
      current.Value.Count = previous.Value.Count + current.Value.Count;
      buckets.Remove(previous);
      // A merged bucket can cause a cascade of merges due to
      // its new count, continue iteration from merged bucket
      // otherwise the cascade might go unnoticed
      previous = current.Previous;
    }
    else
    {
      // No merge, continue iteration with next bucket 
      previous = current;
      current = current.Next;
    }
  }
}

More formally, buckets have non-decreasing counts from the first (most recent) to the last (oldest) bucket in the list. The bucket counts are constrained to powers of two. Let k = 1⁄ε  and k⁄2 be an integer, or else replace the latter by . Except for the last bucket’s count, let there be at least k⁄2 and at most k⁄2 + 1 buckets of the same count. Whenever there are k⁄2 + 2 buckets of the same count, the oldest two are merged into one bucket with twice the count of the oldest bucket and the most recent of their timestamps. Whenever two buckets are merged, traversal continues from the merged bucket. The merge can cause a cascade of merges. Otherwise traversal continues from the next bucket.

To get a feeling for the count approximation, look at the histogram’s query method:

public long Query()
{
  long last = buckets.Last != null ? buckets.Last.Value.Count : 0;
  return (long)Math.Ceiling(total - last / 2.0);
}

The sum of the bucket counts up to the last bucket is exact. The last bucket must cover at least one non-expired event, otherwise the bucket is expired and removed. Its count must be estimated because it can cover expired events. By estimating the actual count of the last bucket as half the last bucket’s count, the absolute error of that estimate is no larger than half that bucket’s count. The overall count is estimated by the sum of all bucket counts (total) minus half the last bucket’s count. To ensure the absolute error is within bounds of the relative error, the last bucket’s influence must be small enough compared to the sum of the other bucket counts. Thankfully, this is ensured by the merge procedure.

Do the code listings and explanations up to this point leave you puzzled about the workings of the histogram? Read through the following example.

Suppose you have a newly initialized histogram with window size  n = 7 and relative error upper bound ε = 0.5, so k = 2. The histogram develops as shown in Figure 3, and a schematic overview of this histogram is depicted in Figure 4. In Figure 3, merges are at timestamps 5, 7 and 9. A cascaded merge is at timestamp 9. An expired bucket is at timestamp 13. I’ll go into more detail about this.

Figure 3 Example of the Exponential Histogram

Timestamp Event

Buckets (Timestamp, Count)

Newest “ … “ Oldest

Total Query Exact

Relative

Error

1 0   0 0 0 0
2 1 (2,1) 1 1 1 0
3 1 (3,1) “ (2,1) 2 2 2 0
4 0 (3,1) “ (2,1) 2 2 2 0

5

(merge)

1 (5,1) “ (3,1) “ (2,1) 3 2 3 0.333...
(5,1) “ (3,2)
6 1 (6,1) “ (5,1) “ (3,2) 4 3 4 0.25

7

(merge)

1 (7,1) “ (6,1) “ (5,1) “ (3,2) 5 4 5 0.2
(7,1) “ (6,2) “ (3,2)
8 1 (8,1) “ (7,1) “ (6,2) “ (3,2) 6 5 6 0.166...

9

(merge)

(cascaded merge)

1 (9,1) “ (8,1) “ (7,1) “ (6,2) “ (3,2) 7 5 6 0.166...
(9,1) “ (8,2) “ (6,2) “ (3,2)
(9,1) “ (8,2) “ (6,4)
10 0 (9,1) “ (8,2) “ (6,4) 7 5 5 0
11 0 (9,1) “ (8,2) “ (6,4) 7 5 5 0
12 0 (9,1) “ (8,2) “ (6,4) 7 5 4 0.25
13 0 (9,1) “ (8,2) 3 2 3 0.333...

A Schematic Overview of the Histogram Depicted in Figure 3
Figure 4 A Schematic Overview of the Histogram Depicted in Figure 3

The first event has no effect. At the fifth event, a merge of the oldest buckets occurs because there are Text Box: k⁄2 + 2 buckets with the same count of 1. Again, a merge happens at the seventh event. At the ninth event, a merge cascades into another merge. Note that after the seventh event, the first event expires. No bucket carries an expired timestamp until the 13th event. At the 13th event, the bucket with timestamp 6 no longer covers at least one non-expired event and thus expires. Note that the observed relative error is clearly less than the relative error upper bound.

In Figure 4, a dotted box is the window size at that point; it contains the buckets and implies the span of events covered. A solid box is a bucket with timestamp on top and count on bottom. Situation A shows the histogram at timestamp 7 with arrows to the counted events. Situation B shows the histogram at timestamp 9. The last bucket covers expired events. Situation C shows the histogram at timestamp 13. The bucket with timestamp 6 expired.

After putting it all together, I wrote a small demonstration program for the exponential histogram (check out the source code download for this article). The results are shown in Figure 5. It simulates a stream of 100 million events with a count-based window size N of 1 million events. Each event has a payload of 0 or 1 with 50 percent chance. It estimates the approximate count of 1s with an arbitrarily chosen relative error upper bound ε of 1 percent, or 99 percent accuracy. The memory savings of the histogram are huge compared to windows; the number of buckets is far less than the number of events in the window. An event rate of a few hundred thousand events per second is achieved on a machine with an Intel 2.4 GHz dual-core processor and 3GB of RAM running Windows 7.

Empirical Results for the Exponential Histogram
Figure 5 Empirical Results for the Exponential Histogram

A Beauty Called StreamInsight

Perhaps you’re wondering what Microsoft StreamInsight is and where it fits in. This section provides some basics. StreamInsight is a robust, high-performance, low-overhead, near-zero-latency and extremely flexible engine for processing on streams. It’s currently at version 2.1. The full version requires a SQL Server license, though a trial version is available. It’s run either as a stand-alone service or embedded in-process.

At the heart of streaming data processing is a model with temporal streams of events. Conceptually, it’s a potentially infinite and voluminous collection of data arriving over time. Think of stock exchange prices, weather telemetry, power monitoring, Web clicks, Internet traffic, toll booths and so on. Each event in the stream has a header with metadata and a payload of data. In the header of the event, a timestamp is kept, at a minimum. Events can arrive steadily, intermittently or perhaps in bursts of up to many thousands per second. Events come in three flavors: An event can be confined to a point in time; be valid for a certain interval; or be valid for an open-ended interval (edge). Besides events from the stream, a special punctuation event is issued by the engine called the Common Time Increment (CTI). Events can’t be inserted into the stream with a timestamp less than the CTI’s timestamp. Effectively, CTI events determine the extent to which events can arrive out of order. Thankfully, StreamInsight takes care of this.

Heterogeneous sources of input and sinks of output streams must somehow be adapted to fit into this model. The events in the (queryable) temporal streams are captured in an IQStreamable<TPayload>. Event payloads are conceptually pulled by enumeration or pushed by observation into the stream. Hence, underlying data can be exposed through an IEnumerable<T>/IQueryable<T> (Reactive Extension) or IObservable<T>/IQbservable<T> (Reactive Extension), respectively, parameterized with the type of data exposed. They leave the maintenance of temporal aspects to the processing engine. Conversion from and to the various interfaces is possible.

The sources and sinks just discussed live on the boundaries, whereas the actual processing happens within queries. A query is a basic unit of composition written in LINQ. It continuously processes events from one or more streams and outputs another stream. Queries are used to project, filter, group-apply, multicast, operate/aggregate, join, union and window events. Operators can be user-defined. They work on events (incremental) or on windows (non-incremental) as they arrive.

A note on windowing is in order. Windowing partitions a stream into finite subsets of events that might overlap between consecutive windows. Windowing effectively produces a stream of windows, reflected by an IQWindowedStreamable<TPayload> in StreamInsight. Currently, three different kinds of windowing constructs are supported: count-based, time-based and snapshot windows. Count-based windows cover the most recent N events and slide upon the arrival of a new event, expiring the oldest and inserting the newest. Time-based windows cover the most recent events in the most recent inter­val of time and slide by some interval (also called hopping or tumbling). Snapshot windows are driven by event start and end times; that is, for every pair of closest event start and end times, a window is created. In contrast to time-based windows driven by intervals along the timeline, regardless of events, snapshot windows aren’t fixed along the timeline.

That just scratches the surface. More information is available from several sources, including the online Developer’s Guide (bit.ly/T7Trrx), “A Hitchhiker’s Guide to StreamInsight 2.1 Queries” (bit.ly/NbybvY), CodePlex examples, the StreamInsight team blog (blogs.msdn.com/b/streaminsight) and others.

Putting It All Together

The foundations are laid. At this point, you’re probably wondering how approximate counting is brought to life in StreamInsight. In short, some (temporal) source stream of point-in-time events, carrying a payload of 0 or 1, is required. It’s fed into a query that computes the approximate count of 1s over the N most recent events using the exponential histogram. The query produces some (temporal) stream of point-in-time events—carrying the approximate count—that’s fed into a sink.

Let’s start with a user-defined operator for approximate counting. You might be tempted to capture the N most recent events using the count-based windowing construct. Think again! That would defy the memory-saving benefits of the exponential histogram. Why? The construct forces entire windows of events to be kept in memory. It’s not required by the exponential histogram, because it has an equivalent implicit notion of windowing through the maintenance of buckets. Moreover, having an operator over windows is non-incremental, that is, with no processing of events as they arrive, but only when a (next) window is available. The solution is a user-defined stream operator without explicit windowing constructs on the query. The code is listed in Figure 6.

Figure 6 User-Defined Stream Operator Implementation

[DataContract]
public class ApproximateCountUDSO : CepPointStreamOperator<bool, long>
{
  [DataMember]
  private ExponentialHistogram histogram;
  [DataMember]
  private long currentTimestamp;  // Current (discrete) timestamp
  public ApproximateCountUDSO(long n, double epsilon)
  {
    histogram = new ExponentialHistogram(n, epsilon);
  }
  public override IEnumerable<long> ProcessEvent(
    PointEvent<bool> inputEvent)
  {
    currentTimestamp++;
    histogram.Update(currentTimestamp, inputEvent.Payload);
    yield return histogram.Query();
  }
  public override bool IsEmpty
  {
    get { return false; }
  }
}

The operator derives from the abstract CepPointStreamOperator<TInputPayload, TOutputPayload>. It has an exponential histogram instance variable. Note the decoration with DataContract and DataMember attributes. This informs StreamInsight how to serialize the operator—for example, for resiliency purposes. The operator overrides the IsEmpty operator to indicate it’s non-empty, that is, the operator is stateful. This prevents StreamInsight from messing with the operator when minimizing memory utilization. The ProcessEvent method is the operator’s core. It increments the current (discrete) timestamp and passes it along with the event payload to the histogram’s update method. The histogram handles the bucketing and is queried for the approximate count. Make sure to use the yield-return syntax, which makes the operator enumerable. Operators are generally wrapped in some extension method hidden in a utility class. This code shows how it’s done:

public static partial class Utility
{
  public static IQStreamable<long> ApproximateCount(
    this IQStreamable<bool> source, long n, double epsilon)
  {
    return source.Scan(() => new ApproximateCountUDSO(n, epsilon));
  }
}

That’s it! Plug the operator into a query via the extension method. A bit of extra code is required to actually demonstrate its use. Here’s a trivial source stream:

public static partial class Utility
{
  private static Random random = new Random((int)DateTime.Now.Ticks);
  public static IEnumerable<bool> EnumeratePayloads()
  {
    while (true)  // ad infinitum
    {
      bool payload = random.NextDouble() >= 0.5;
      yield return payload;
    }
  }
}

This generates random payloads of 0s and 1s. The yield-return syntax turns it into an enumerable source. Put it in a utility class, if you will.

The infamous Program class is shown in Figure 7. It creates the in-process embedded StreamInsight server instance. A so-called application instance named ApproximateCountDemo is created as a streaming processing (metadata) container, for example, for named streams, queries and so on. An enumerable source of point-in-time events is defined, using the payload-generating utility method described earlier. It’s transformed into a temporal stream of point-in-time events. The query is defined with LINQ and selects the operator approximate counts computed over the source stream. This is where the extension method for the user-defined operator comes in handy. It’s bootstrapped with a window size and relative error upper bound. Next, the query output is transformed into an enumerable sink, stripping the temporal properties. Finally, the sink is iterated over, thereby actively pulling the events through the pipeline. Execute the program and enjoy its number-crunching output on the screen.

Figure 7 Embedding and Executing in StreamInsight

class Program
{
  public const long N = 10000;
  public const double Epsilon = 0.05;
  static void Main(string[] args)
  {
    using (Server server = Server.Create("StreamInsight21"))
    {
      var app = server.CreateApplication("ApproximateCountDemo");
      // Define an enumerable source
      var source = app.DefineEnumerable(() =>
        Utility.EnumeratePayloads());
      // Wrap the source in a (temporal) point-in-time event stream
      // The time settings determine when CTI events
      // are generated by StreamInsight
      var sourceStream = source.ToPointStreamable(e =>
        PointEvent.CreateInsert(DateTime.Now, e),
        AdvanceTimeSettings.IncreasingStartTime);
      // Produces a stream of approximate counts
      // over the latest N events with relative error bound Epsilon
      var query =
        from e in sourceStream.ApproximateCount(N, Epsilon) select e;
      // Unwrap the query's (temporal) point-in-time
      // stream to an enumerable sink
      var sink = query.ToEnumerable<long>();
      foreach (long estimatedCount in sink)
      {
        Console.WriteLine(string.Format(
          "Enumerated Approximate count: {0}", estimatedCount));
      }
    }
  }
}

To briefly recap, this article explains how to approximate the count over a window of events in logarithmic time and space with upper-bounded error using an exponential histogram data structure. The histogram is embedded in a StreamInsight user-defined operator.

The histogram and operator can be extended to support variable-size windows, such as time-based windows. This requires the histogram to know the window interval/timespan rather than the window size. Buckets are expired when their timestamp is before the new event’s timestamp minus the window timespan. An extension to compute variance is proposed in the paper, “Maintaining Variance and k–Medians over Data Stream Windows,” by Brian Babcock, Mayur Datar, Rajeev Motwani and Liadan O’Callaghan (stanford.io/UEUG0i). Apart from histograms, other so-called synopsis structures are described in literature. You can think of random samples, heavy hitters, quantiles and so on.

The source code accompanying this article is written in C# 4.0 with Visual Studio 2010 and requires StreamInsight 2.1. The code is free for use under the Microsoft Public License (Ms-PL). Note that it was developed for educational purposes and was neither optimized nor tested for production environments.


Michael Meijer is a software engineer at CIMSOLUTIONS BV, where he provides IT consulting services and software development solutions to companies throughout the Netherlands. His interests in StreamInsight and streaming data processing started during his research at the University of Twente, Enschede, Netherlands, where he received a Master of Science degree in Computer Science–Information Systems Engineering.

Thanks to the following technical experts for reviewing this article: Erik Hegeman, Roman Schindlauer and Bas Stemerdink