StreamInsight

控制事件流:快速近似计数

Michael Meijer

下载代码示例

 

你们得浩繁和潜力无穷的事件,如点击流、 传感器数据、 信用卡交易数据或互联网交通流。它是不可行的要存储的所有事件或分析他们在多个传递。为什么不求助于最近发生的事件,以简化分析的窗口?

假设您想要的涵盖最新 N 事件流的一个大窗口中的有趣事件进行计数。盘点幼稚的做法要求要在内存和充分的迭代中对他们的所有 N 事件。窗口幻灯片所到达的一个新的事件,其最早的事件到期,插入新的事件。在新的窗口,从零开始计数废物共享 N 2 事件的处理时间。恶 !本文介绍了一种数据结构来减少内存空间的使用和处理时间需要使用该方法时,应同时支持事件速度超过每秒对商品化的硬件事件有成千上万的一小部分。这篇文章还演示了如何在 Microsoft 流式数据处理器,StreamInsight 2.1 用户定义流运算符在 C# 中嵌入的数据结构。中间的编程技能均须遵守沿,和一些经验 StreamInsight 会非常有用。

一个故事的计数

潜入 StreamInsight 之前, 我会调查看似琐碎问题的计数。为简单起见,假设流具有与有效载荷的 0 或 1 的事件 — — 枯燥无味和有趣的事件,分别 (无论什么构成"有趣的"在您的特定情况下)。包含最新的 N 事件 (固定大小) 基于计数的窗口上方数是 1 秒。天真计数需要 o (n) 时间和空间。

作为一个精明的阅读器,您可能出了保持连续的窗口之间的计数和新 1s 和递减递增它的想法它过期 1s,为分享 N 2 事件已处理。良好的思维 !O (1) 时间: 现在维持计数。但是,应该你递减为过期的事件或不?除非您知道实际的事件,不能保持计数。不幸的是,要知道直到他们已过期的事件需要在内存中的整个窗口 — — 也就是说,它占用 o (n) 的空间。另一种策略可能是筛选出不感兴趣的事件和计数只剩下有趣的事件。但这并不减少计算的复杂性,让你可变大小的窗口。

可以被驯服的内存野兽吗?是的它可以 !但是,它需要的处理时间和内存空间牺牲准确性之间的妥协。和领、 阿里斯蒂德 Gionis、 彼得 · 迪克 Rajeev Motwani 的论文题为"维护流统计超过滑动窗口"(stanford.io/SRjWT0) 描述了一个称为指数直方图的数据结构。它对与有界的相对误差的最后 N 事件保持近似计数 ε。这意味着,在所有时间:

|exact count – approximate count|  ≤ ε, where 0 < ε < 1 
       exact count

从概念上讲,直方图将事件存储在存储桶。 每桶最初包括一个事件,所以它有一个计数 1、 它涵盖了该事件的时间戳。 当事件到达时,过期水桶 (包括过期的事件) 被删除。 一桶只创建一个有趣的事件。 随着存储桶的创建随着时间的推移,他们正在合并,以节省内存。 水桶合并所以他们有呈指数级增长计数从最新到最后一桶,也就是说,1,1,......,2,2,......,4、 4、......、 8、 8 等。 这种方式,存储桶的数目是对数的窗口大小 N. 更确切地说,它需要 O (1⁄ε 日志 N) 的时间和空间进行维护。 但最后的斗都涵盖只非过期的事件。 最后的斗涵盖至少一个非过期的事件。 必须估计其计数,其中在逼近总体的计数将导致错误。 因此,最后的斗必须保持足够小,能够尊重相对误差上限。

在下一节中,与最低的数学讨论是指数直方图在 C# 中执行。 读取上述文件的错综复杂的详细信息。 我将解释代码,并跟进用笔和纸的例子。 直方图是为用户定义的流运算符在本文稍后部分发达国家 StreamInsight 构建基块。

斗或不斗

这里是斗类:

[DataContract]
public class Bucket
{
  [DataMember]
  private long timestamp;
  [DataMember]
  private long count;
  public long Timestamp {
    get { return timestamp; }
    set { timestamp = value; } }
  public long Count { get { return count; } set { count = value; } }
}

它有一个计数 (有趣) 的事件,它涵盖了、 它涵盖的最新事件的时间戳。 如前所述,只有最后的斗可以涵盖过期的事件,但它必须包括至少一个非过期的事件。 因此,所有但最后斗计数是确切。 最后斗计数必须由直方图估计。 存储桶含只过期的事件自己过期,可以从直方图中删除。

指数直方图使用只是两个操作,确保相对误差上限 ε N 最近发生的事件感兴趣的事件计数。 一次操作是更新新的和过期的事件,保持桶一桶的直方图。 另一个是用于查询来自桶一桶的近似数。 直方图类大纲所示图 1。 除了存储桶的链接列表,其关键的变量是窗口大小 (n)、 相对误差上限 (epsilon) 和缓存所有斗计数 (总数) 的总和。 在构造函数中设置给定的窗口大小、 给定相对误差上限和存储桶的初始为空列表。

图 1 指数直方图类大纲

[DataContract]
public class ExponentialHistogram
{
  [DataMember]
  private long n;
  [DataMember]
  private double epsilon;
  [DataMember]
  private long total;
  [DataMember]
  private LinkedList<Bucket> buckets;
  public ExponentialHistogram(long n, double epsilon)
  {
    this.
n = n;
    this.epsilon = epsilon;
    this.buckets = new LinkedList<Bucket>();
  }
  public void Update(long timestamp, bool e) { ...
}
  protected void ExpireBuckets(long timestamp) { ...
}
  protected void PrependNewBucket(long timestamp) { ...
}
  protected void MergeBuckets() { ...
}
  public long Query() { ...
}
}

此更新方法进行维修的直方图:

public void Update(long timestamp, bool eventPayload)
{
  RemoveExpiredBuckets(timestamp);
  // No new bucket required; done processing
  if (!eventPayload)
    return;
  PrependNewBucket(timestamp);
  MergeBuckets();
}

它接受一个离散的时间戳,而不是墙上时钟时间,以确定有哪些最新的 N 事件。 这用来查找并删除已过期的存储桶。 如果新的事件已为 0 (false) 的有效载荷,将停止处理。 当新的事件具有有效载荷的 1 (true) 时,新桶是创建和预置到存储桶的列表。 在合并桶一桶的真正的烟花。 在序列中讨论由 update 方法调用的方法。

这里是去除水桶的代码:

protected void RemoveExpiredBuckets(long timestamp)
{
  LinkedListNode<Bucket> node = buckets.Last;
  // A bucket expires if its timestamp
  // is before or at the current timestamp - n
  while (node != null && node.Value.Timestamp <= timestamp - n)
  {
    total -= node.Value.Count;
    buckets.RemoveLast();
    node = buckets.Last;
  }
}

遍历从最古老的 (最后一个) 斗开始,并在第一次非过期斗结束。 事件的最新时间戳过期的每个存储桶 — — 也就是说,其时间戳是不大于当前时间戳减去窗口大小 — — 从列表中删除。 这是离散的时间戳是哪里来的。 所有斗计数 (总数) 的总和是递减计数每个过期的存储桶。

后过期事件和水桶被占,处理新的事件:

protected void PrependNewBucket(long timestamp)
{
  Bucket newBucket = new Bucket()
  {
    Timestamp = timestamp,
    Count = 1
  };
  buckets.AddFirst(newBucket);
  total++;
}

新斗为 1 (true) 有效载荷与事件创建带有 1 和等于当前时间戳时间戳的计数。 新的斗预置到存储桶的列表并递增的所有斗计数 (总数) 的总和。

内存节省空间和错误边界魔术是并购的水桶中。 中列出的代码图 2。 水桶合并这样连续水桶有呈指数级增长......,计数,也就是说,1,1,2,2,......,4、 4、......、 8、 8 等。 具有相同数量的存储桶数,由选择的相对误差上限 ε。 总的存储桶数对数随着窗口 n,其中解释了节省内存空间的大小。 合并后尽可能多桶,但最后斗数保持在最小不足 (相比其他斗计数的总和),确保相对误差为界。

图 2 合并在直方图中的存储桶

protected void MergeBuckets()
{
  LinkedListNode<Bucket> current = buckets.First;
  LinkedListNode<Bucket> previous = null;
  int k = (int)Math.Ceiling(1 / epsilon);
  int kDiv2Add2 = (int)(Math.Ceiling(0.5 * k) + 2);
  int numberOfSameCount = 0;
  // Traverse buckets from first to last, hence in order of
  // descending timestamp and ascending count
  while (current != null)
  {
    if (previous != null && previous.Value.Count == current.Value.Count)
      numberOfSameCount++;
    else
      numberOfSameCount = 1;
    // Found k/2+2 buckets of the same count?
if (numberOfSameCount == kDiv2Add2)
    {
      // Merge oldest (current and previous) into current
      current.Value.Timestamp = previous.Value.Timestamp;
      current.Value.Count = previous.Value.Count + current.Value.Count;
      buckets.Remove(previous);
      // A merged bucket can cause a cascade of merges due to
      // its new count, continue iteration from merged bucket
      // otherwise the cascade might go unnoticed
      previous = current.Previous;
    }
    else
    {
      // No merge, continue iteration with next bucket 
      previous = current;
      current = current.Next;
    }
  }
}

更正式的水桶有是非递减计数从第一个 (最新) 到列表中最后的 (最早) 斗。 斗计数被约束为 2 的幂。 让 k = 1⁄ε和 k⁄2 是一个整数,否则,取代后者的。 除了最后斗计数,让至少有 k⁄2 和最 k⁄2 + 1 桶的相同的计数。 每当有 k⁄2 + 2 桶的相同的计数、 最古老的两个被合并到一桶两倍的最古老的斗计数与最近的其时间戳。 合并两个水桶,每当从合并后的斗继续遍历。 合并会导致一连串的合并。 否则会遍历继续从下一个桶里。

若要获取计数逼近的一种感觉,看看直方图的查询方法:

public long Query()
{
  long last = buckets.Last != null ?
buckets.Last.Value.Count : 0;
  return (long)Math.Ceiling(total - last / 2.0);
}

到最后的斗斗数的总和是确切的。 最后的斗必须包含至少一个非过期的事件,否则为斗是过期并删除。 因为它可以包括过期的事件,必须估计其计数。 通过估计的最后一桶半最后斗的计数作为实际计数,绝对错误的估计是不大于该斗的半计数。 总体的计数被估计所有斗计数 (总) 减去一半最后斗的计数的总和。 要确保绝对的错误是相对误差,最后斗影响范围内必须足够小相比其他斗数的总和。 所幸的是,这被确保合并过程。

做的代码清单和这一点的解释离开你疑惑的直方图运作吗? 下面的示例通过读取。

假设您有新初始化的直方图与窗口大小 n = 7 和相对误差上限 ε = 0.5,所以 k = 2。 直方图开发中所示图 3,和中描述了此直方图示意性概述 图 4。 在图 3,合并是在 5、 7 和 9 的时间戳。 级联的合并是 9 的时间戳。 过期的斗是 13 的时间戳。 我将更详细地介绍这去。

图 3 的指数直方图示例

 

A Schematic Overview of the Histogram Depicted in Figure 3
图 4 图 3 所示的直方图图表概述

第一个事件没有任何影响。 在第五个活动,合并最古老斗的发生是因为有文本框中:k⁄2 + 2 桶与 1 相同的计数。 再次,合并发生在第七届的事件。 在第九届的事件中,合并到另一个合并级联。 注意在第七届的事件之后, 的第一个事件到期。 没有斗携带 13 事件直到过期时间戳。 13 事件,在时间戳 6 不再为斗涵盖至少一个非过期事件并因此过期。 请注意观察到相对误差是明显小于相对误差上限。

图 4,虚线的框是窗口大小该点 ; 它包含水桶,意味着所涉及的事件的范围。 实心框是一桶上顶部的时间戳与上底部的计数。 情况 A 在带有箭头的计数的事件的时间戳 7 显示直方图。 情况 B 在 9 的时间戳显示直方图。 最后的斗涵盖过期的事件。 情况 C 在 13 时间戳显示直方图。 在时间戳为 6 桶过期。

后放在一起,我写了指数直方图 (签出这篇文章的来源代码下载) 小示范工程。 结果如图 5 所示。 它模拟 1 亿 100 万的事件计数基于窗口大小 N 的事件流。 每个事件都有一个有效载荷的 0 或 1 与 50%的机会。 它估计的近似数与任意选择的相对误差上限 1s ε 的 1%或 99%的准确性。 内存储蓄的直方图是巨大的相比 windows ; 存储桶的数目是远远低于在窗口中的事件数。 每秒几个 10 万事件事件率是与英特尔 2.4 g h z 双核心处理器和 3 GB 的 RAM 运行 Windows 7 的计算机上实现的。

Empirical Results for the Exponential Histogram
图 5 实证结果指数直方图

称为 StreamInsight 美

也许你想知道微软 StreamInsight 是什么以及它。 本节提供了一些基本的东西。 StreamInsight 是一个强健、 高性能、 低开销、 接近零延迟和极其灵活的引擎,溪流上的处理。 这是目前在 2.1 版。 完整版需要 SQL Server 许可证,但试用版是可用。 它具有运行或者作为一个独立的服务或嵌入过程中。

在流式数据处理的核心是与时空流的事件模型。 从概念上讲,它是潜在的无限和浩繁集合的数据随着时间的推移抵港。 认为股票交易所价格、 天气遥测、 电力监控、 Web 点击,互联网流量,收费亭等。 流中的每个事件都具有元数据和数据的有效负载的标头。 在事件的标题中,保留时间戳,在最低限度。 事件可以到达稳步、 间歇性或也许在每秒数千扫射。 事件有三种类型:一个事件可以限于点时间 ; 其有效期为一定的间隔 ; 或有效期,不限成员名额的时间间隔 (边缘)。 从流的事件,除了特殊标点事件是由引擎调用公共时间增量 (CTI) 发出的。 事件不能插入流中时间戳小于 CTI 的时间戳。 有效,CTI 事件确定事件可以到达顺序的程度。 所幸的是,StreamInsight 照顾这一点。

异类源的输入和输出流的汇必须以某种方式加以修改以适应到此模型。 (可查询) 时空流中的事件是在 IQStreamable <TPayload> 中捕获的。 事件有效载荷从概念上讲枚举由拉或推入流的观察。 因此,基本数据可以通过公开 IEnumerable <T> / IQueryable <T> (无功扩展名) 或 IObservable <T> / IQbservable <T> (无功扩展名),分别与公开的数据类型参数化。 他们留给处理引擎的时态方面的维护。 从和到的各种接口的转换是可能的。

源和汇刚刚讨论过生活的边界,而实际处理发生在查询中。 基本单元组成编写 LINQ 中的查询。 它不断处理事件从一个或多个数据流,并输出另一个流。 查询是用于项目、 筛选器、 组适用、 多播,操作/聚合,联接、 联盟和窗口的事件。 营办商可由用户定义。 当他们到达时,他们工作事件 (增量) 或者基于 windows (非增量)。

关于窗口说明的顺序。 窗口化分区分成有限子集的连续的窗口之间可能重叠的事件流。 窗口化有效地产生的 windows 中,反映在 IQWindowedStreamable <TPayload> 的流 在 StreamInsight。 目前,支持三种不同的窗口化构造:基于计数的、 基于时间和快照窗口。 基于计数的窗口覆盖了最新的 N 事件和幻灯片所到达的一个新的事件,过期最古老和最新插入。 基于时间的 windows 盖除最近发生的事件中最新­val 的时间以及一些间隔状态 (也称为跳跃或翻滚) 幻灯片。 快照窗口由事件的开始时间和结束时间 ; 这,对于每对最近事件的开始和结束时间,创建窗口。 与基于时间的 windows 驱动的间隔沿时间线,无论事件,快照窗口沿时间线不被固定。

这仅仅是表面。 从几个来源,包括联机开发人员指南 》 的详细信息 (bit.ly/T7Trrx),"银河系 StreamInsight 2.1 查询指南"(bit.ly/NbybvY),CodePlex 例子,StreamInsight 团队博客 (blogs.msdn.com/b/streaminsight) 和其他人。

总结

是奠定基础。 在这一点上,你可能不知道如何近似计数提请 StreamInsight 中生活。 总之,需要在时间点事件,携带有效载荷的 0 或 1,一些 (时空) 源流。 它被送入 N 最最近发生的事件使用的指数直方图计算近似的 1s 计数的查询。 查询将产生一些 (时间) 时间点在事件流 — — 携带的近似计数 — — 这送入一个接收器。

让我们从一个用户定义的运算符近似计数。 你可能会忍不住要捕获使用基于计数的窗口化构造 N 最新的事件。 再想一想 ! 这会违抗指数直方图的节省内存的好处。 为什么呢? 构建部队整个 windows 要保存在内存中的事件。 它不需要由指数直方图,因为它具有等效的隐式概念,通过存储桶的维护窗口化。 此外,有一个运算符对 windows 是非增量的就是,与没有处理的事件到达时,但只有当 (下) 窗口时可用。 解决方案是一个用户定义的流运算符没有显式窗口化构造查询。 中列出的代码图 6

图 6 用户定义流运算符执行

[DataContract]
public class ApproximateCountUDSO : CepPointStreamOperator<bool, long>
{
  [DataMember]
  private ExponentialHistogram histogram;
  [DataMember]
  private long currentTimestamp;  // Current (discrete) timestamp
  public ApproximateCountUDSO(long n, double epsilon)
  {
    histogram = new ExponentialHistogram(n, epsilon);
  }
  public override IEnumerable<long> ProcessEvent(
    PointEvent<bool> inputEvent)
  {
    currentTimestamp++;
    histogram.Update(currentTimestamp, inputEvent.Payload);
    yield return histogram.Query();
  }
  public override bool IsEmpty
  {
    get { return false; }
  }
}

操作员从抽象的 CepPointStreamOperator < TInputPayload、 TOutputPayload > 派生。 它具有指数直方图的实例变量。 请注意 DataContract 和 DataMember 属性与装饰。 这会通知 StreamInsight 如何序列化该运算符 — — 例如,对于恢复能力的目的。 运算符重写空空如也运算符,以指示它为非空,该运算符是有状态的即。 这可以防止 StreamInsight 玩弄运算符时尽量减少内存使用率。 ProcessEvent 方法是运营商的核心。 它递增 (离散) 当前时间戳,并将其与事件负载中传递给直方图的 update 方法。 直方图处理了瓢泼大雨和近似计数查询。 请确保使用产量返回语法,这使得该运算符可枚举。 营办商一般包装一些隐藏在一个实用程序类中的扩展方法中。 此代码演示如何做:

public static partial class Utility
{
  public static IQStreamable<long> ApproximateCount(
    this IQStreamable<bool> source, long n, double epsilon)
  {
    return source.Scan(() => new ApproximateCountUDSO(n, epsilon));
  }
}

就是这样了! 该运算符插入查询通过扩展方法。 需要一点额外的代码实际上证明其使用。 这里是一个微不足道的源流:

public static partial class Utility
{
  private static Random random = new Random((int)DateTime.Now.Ticks);
  public static IEnumerable<bool> EnumeratePayloads()
  {
    while (true)  // ad infinitum
    {
      bool payload = random.NextDouble() >= 0.5;
      yield return payload;
    }
  }
}

这将生成随机的 0 和 1 的有效载荷。 产量返回语法把它变成一个可枚举的来源。 把它放在实用程序类中,如果你愿意的话。

臭名昭著的程序类所示图 7。 它创建的过程中嵌入的 StreamInsight 服务器实例。 创建名为 ApproximateCountDemo 的所谓应用程序实例的如流式处理 (元数据) 的容器,例如,对于命名的流,查询等。 使用前面所述的有效负载生成实用程序方法,定义可枚举的时间点在事件源。 它变成一个时空的时间点在事件流。 查询使用 LINQ 定义并选择通过源流计算运算符近似计数。 这是迟早要用到扩展方法的用户定义的运算符的位置。 它被引导与窗口的大小和相对误差上限。 下一步,查询输出转换成一个可枚举的接收器,剥离的时空属性。 最后,接收器迭代结束,从而积极地拉动通过管道事件。 执行该程序,并享有其在屏幕上的数字计算输出。

图 7 嵌入和在 StreamInsight 中执行

class Program
{
  public const long N = 10000;
  public const double Epsilon = 0.05;
  static void Main(string[] args)
  {
    using (Server server = Server.Create("StreamInsight21"))
    {
      var app = server.CreateApplication("ApproximateCountDemo");
      // Define an enumerable source
      var source = app.DefineEnumerable(() =>
        Utility.EnumeratePayloads());
      // Wrap the source in a (temporal) point-in-time event stream
      // The time settings determine when CTI events
      // are generated by StreamInsight
      var sourceStream = source.ToPointStreamable(e =>
        PointEvent.CreateInsert(DateTime.Now, e),
        AdvanceTimeSettings.IncreasingStartTime);
      // Produces a stream of approximate counts
      // over the latest N events with relative error bound Epsilon
      var query =
        from e in sourceStream.ApproximateCount(N, Epsilon) select e;
      // Unwrap the query's (temporal) point-in-time
      // stream to an enumerable sink
      var sink = query.ToEnumerable<long>();
      foreach (long estimatedCount in sink)
      {
        Console.WriteLine(string.Format(
          "Enumerated Approximate count: {0}", estimatedCount));
      }
    }
  }
}

简单回顾一下,这篇文章解释了如何高层楼宇的错误使用指数直方图数据结构与计数的事件窗口上方近似对数时间和空间。直方图被嵌入在 StreamInsight 用户定义的运算符。

直方图和运算符可以进行扩展以支持可变大小的窗口,例如基于时间的 windows。这就需要知道窗口间隔/时间跨度,而不是窗口大小的直方图。存储桶被过期之前新事件时间戳减去窗口 timespan 其时间戳时。扩展来计算方差在文件中,提出"维护方差与 k–Medians 在流数据窗口"的 Liadan O'Callaghan Rajeev 瓦尼和领、 布赖恩 · 巴布科克 (stanford.io/UEUG0i)。直方图,除了文学中介绍其他所谓简介结构。你能想到的随机样本、 重量级、 分位数,等等。

本文附带的源代码编写 C# 4.0 与 Visual Studio 2010 中,需要 StreamInsight 2.1。该代码是免费的微软公共许可证 (Ms PL) 下使用。请注意它发展为教育目的是既不优化也生产环境的测试。

Michael Meijer 是作为 CIMSOLUTIONS BV,其中他提供 IT 咨询服务和向整个荷兰公司软件开发解决方案的软件工程师。期间他温特大学,荷兰恩斯赫德获科学硕士学位计算机 Science–Information 工程系统的研究开始他 StreamInsight 和流数据处理中的利益。

衷心感谢以下技术专家对本文的审阅:埃里克 · 赫格曼、 罗马 Schindlauer 和 Bas 斯泰默丁克