Il presente articolo è stato tradotto automaticamente.

StreamInsight

Flusso di eventi sotto controllo: conteggio di approssimazione rapida

Michael Meijer

Scarica il codice di esempio

 

In modo da avere un flusso voluminoso e potenzialmente infinito di eventi quali clickstream, dati dei sensori, dei dati delle transazioni con carta di credito o traffico Internet.È fattibile per memorizzare tutti gli eventi o analizzarli in più passaggi.Perché non ricorrere a una finestra degli eventi recenti per semplificare l'analisi?

Si supponga di che voler contare il numero di eventi interessanti in una grande finestra che copre gli ultimi eventi N del flusso.Un approccio naïve al conteggio richiede tutti gli eventi di N essere in memoria e un'iterazione completa su di loro.Come la finestra scorre con l'arrivo di un nuovo evento, scade il relativo evento più antico e il nuovo evento è inserito.Conteggio sopra la finestra di nuova da zero sprechi il tempo di elaborazione dedicato alla N-2 eventi condivisi.Bleah!Questo articolo spiega una struttura di dati per ridurre l'utilizzo dello spazio di memoria e tempo di elaborazione per una piccola frazione di quello che sarebbe necessario con tale metodo, pur sostenendo un tasso di eventi superiore a molte migliaia di eventi al secondo su hardware commodity.Questo articolo inoltre illustrato come incorporare la struttura dati di un operatore del flusso definito dall'utente in C# per il flusso dati processore Microsoft, StreamInsight 2.1.Competenze di programmazione intermedie sono tenute a seguire, e qualche esperienza con StreamInsight può venire pratico.

Un racconto di conteggio

Prima di tuffarsi nel StreamInsight, potrai esaminare il problema apparentemente banale di conteggio.Per semplicità, si supponga che il flusso sia eventi con payload di 0 o 1 — eventi interessanti e poco interessanti, rispettivamente (indipendentemente da ciò che costituisce "interessante" nel tuo scenario specifico).Sopra una finestra (dimensione fissa) basate sul conteggio contenente gli eventi più recenti di N è contato il numero di 1s.Naïve contando prende spazio e tempo o (n).

Come un lettore astuto, si probabilmente avvicinò con l'idea di mantenere il conteggio tra windows consecutivi e incrementandola per nuova 1s e decremento esso per 1s scaduta, condividendo il N-2 eventi già elaborati.Pensare bene!Mantenere il conteggio ora richiede tempo o (1).Tuttavia, dovrebbe diminuire per un evento scaduto o non?A meno che non si conosce l'evento reale, non può essere mantenuto il conteggio.Purtroppo, per conoscere gli eventi fino a quando essi sono scaduti richiede l'intera finestra in memoria — cioè, prende spazio o (n).Un'altra strategia potrebbe essere per filtrare gli eventi poco interessanti e contano solo i restanti interessanti eventi.Ma che non riduce la complessità computazionale e ti lascia con una finestra di dimensioni variabili.

La bestia di memoria può essere domata?Sì, si può!Tuttavia, richiede un compromesso tra tempo e memoria spaziale a scapito della precisione di lavorazione.La carta seminale da Mayur Datar, Aristides Gionis, Piotr Indyk e Rajeev Motwani intitolato "Mantenendo Stream Statistiche sopra Sliding Windows" (stanford.io/SRjWT0) descrive una struttura di dati chiamata istogramma esponenziale.Mantiene un conteggio approssimativo sopra gli ultimi eventi di N con un errore relativo limitato ε.Questo significa che a tutti i tempi:

|exact count – approximate count|  ≤ ε, where 0 < ε < 1 
       exact count

Concettualmente, l'istogramma memorizza gli eventi nei secchi. Ogni secchio copre inizialmente un evento, quindi ha un numero di 1 e un timestamp dell'evento che copre. Quando arriva un evento, scaduto secchi (che copre gli eventi scaduti) vengono rimossi. Un secchio è creato solo per un evento interessante. Secchi vengono creati nel corso del tempo, essi si fuse per risparmiare memoria. Secchi sono fuse così hanno esponenzialmente crescente conta dal più recente l'ultimo secchio, cioè, 1, 1,..., 2, 2,..., 4, 4,..., 8, 8 e così via. In questo modo, il numero di bucket è logaritmico nella dimensione della finestra N. Più precisamente, essa richiede O (1⁄ε log N) tempo e spazio per la manutenzione. Tutti, ma l'ultimo secchio coprire solo gli eventi non scaduto. L'ultimo secchio copre almeno un evento non scaduto. Il conteggio dovrà essere stimato, che provoca l'errore nell'approssimazione il conteggio complessivo. Quindi, l'ultimo secchio deve essere mantenuto abbastanza piccolo per rispettare il limite superiore di errore relativo.

Nella sezione successiva, l'implementazione dell'istogramma esponenziale in c# è discusso con un minimo di matematica. Leggere il suddetto libro per i dettagli intricati. Farò spiegare il codice e follow-up con un esempio di carta e penna. L'istogramma è un blocco di costruzione per l'operatore del flusso definito dall'utente sviluppato più avanti in questo articolo di StreamInsight.

Benna o non secchio

Qui è la classe di benna:

[DataContract]
public class Bucket
{
  [DataMember]
  private long timestamp;
  [DataMember]
  private long count;
  public long Timestamp {
    get { return timestamp; }
    set { timestamp = value; } }
  public long Count { get { return count; } set { count = value; } }
}

Ha un conteggio degli eventi (interessanti) copre e un timestamp dell'evento più recente che copre. Solo l'ultimo secchio può coprire eventi scaduti, come accennato, ma esso deve coprire almeno un evento non scaduto. Da qui, tutti, ma l'ultimi secchio conteggi sono esatti. L'ultimo conteggio secchio deve essere stimata mediante l'istogramma. Secchi contenenti solo gli eventi scaduti sono essi stessi scaduti e possono essere rimosso dall'istogramma.

Utilizzando solo due operazioni, l'istogramma esponenziale garantisce un limite superiore errore relativo ε sul conteggio di eventi interessanti sopra gli eventi più recenti N. Una sola operazione è per aggiornare l'istogramma con eventi nuovi e scaduti, mantenendo i secchi. L'altro è per l'interrogazione il conteggio approssimativo da secchi. La struttura di classe istogramma è mostrata Figura 1. Accanto alla lista collegata di secchi, le variabili fondamentali sono la dimensione della finestra (n), il limite superiore di errore relativo (epsilon) e la somma nella cache di tutti i conteggi di secchio (totale). Nel costruttore, vengono impostate le dimensioni della finestra specificato, il limite superiore dato errore relativo e un elenco vuoto iniziale dei secchi.

Figura 1 l'esponenziale istogramma classe Outline

[DataContract]
public class ExponentialHistogram
{
  [DataMember]
  private long n;
  [DataMember]
  private double epsilon;
  [DataMember]
  private long total;
  [DataMember]
  private LinkedList<Bucket> buckets;
  public ExponentialHistogram(long n, double epsilon)
  {
    this.
n = n;
    this.epsilon = epsilon;
    this.buckets = new LinkedList<Bucket>();
  }
  public void Update(long timestamp, bool e) { ...
}
  protected void ExpireBuckets(long timestamp) { ...
}
  protected void PrependNewBucket(long timestamp) { ...
}
  protected void MergeBuckets() { ...
}
  public long Query() { ...
}
}

La manutenzione dell'istogramma è eseguita da questo metodo di aggiornamento:

public void Update(long timestamp, bool eventPayload)
{
  RemoveExpiredBuckets(timestamp);
  // No new bucket required; done processing
  if (!eventPayload)
    return;
  PrependNewBucket(timestamp);
  MergeBuckets();
}

Si accetta un timestamp discreto, a differenza di tempo orologio da parete, per determinare quali sono gli ultimi eventi N. Questo è usato per trovare e rimuovere secchi scaduti. Se il nuovo evento ha un carico utile pari a 0 (falso), smette di elaborazione. Quando il nuovo evento ha una portata di 1 (vero), un secchio nuovo è creato e anteposta all'elenco dei secchi. I reali fuochi d'artificio sono in fusione i secchi. I metodi chiamati dal metodo update vengono discussi in sequenza.

Ecco il codice per la rimozione dei secchi:

protected void RemoveExpiredBuckets(long timestamp)
{
  LinkedListNode<Bucket> node = buckets.Last;
  // A bucket expires if its timestamp
  // is before or at the current timestamp - n
  while (node != null && node.Value.Timestamp <= timestamp - n)
  {
    total -= node.Value.Count;
    buckets.RemoveLast();
    node = buckets.Last;
  }
}

L'attraversamento inizia dal più vecchio secchio (ultimo) e termina il primo secchio non scaduto. Ogni secchio timestamp di cui più recente evento è scaduto — cui timestamp è non maggiore di timestamp corrente meno la dimensione di finestra — viene rimosso dall'elenco. Questo è dove entra il timestamp discreto. La somma di tutti i conteggi di secchio (totale) è diminuita dal conte di ogni bucket scaduta.

Scaduto dopo eventi e secchi sono rappresentava per il nuovo evento viene elaborato:

protected void PrependNewBucket(long timestamp)
{
  Bucket newBucket = new Bucket()
  {
    Timestamp = timestamp,
    Count = 1
  };
  buckets.AddFirst(newBucket);
  total++;
}

Un nuovo secchio per l'evento con un carico utile di 1 (true) viene creato con un conteggio di 1 e un timestamp uguale al timestamp corrente. La nuova benna viene anteposta alla lista dei secchi e la somma di tutti i conteggi di secchio (totale) viene incrementata.

La magia di risparmiare spazio ed errore di delimitazione di memoria è la fusione di secchi. Il codice è elencato Figura 2. Secchi vengono unite in modo che secchi consecutivi hanno esponenzialmente crescente..., conti, cioè, 1, 1, 2, 2,..., 4, 4,..., 8, 8 e così via. Il numero di bucket con lo stesso numero è determinato dalla scelta dell'errore relativo limite superiore ε. Il numero totale dei secchi cresce logaritmicamente con la dimensione della finestra n, che spiega il risparmio di spazio di memoria. Come molti secchi possibili sono fuse, ma Conte di secchio ultimo sono mantenuta piccola abbastanza (rispetto alla somma di altri conti secchio) per garantire l'errore relativo è limitato.

Figura 2 fusione secchi in istogramma

protected void MergeBuckets()
{
  LinkedListNode<Bucket> current = buckets.First;
  LinkedListNode<Bucket> previous = null;
  int k = (int)Math.Ceiling(1 / epsilon);
  int kDiv2Add2 = (int)(Math.Ceiling(0.5 * k) + 2);
  int numberOfSameCount = 0;
  // Traverse buckets from first to last, hence in order of
  // descending timestamp and ascending count
  while (current != null)
  {
    if (previous != null && previous.Value.Count == current.Value.Count)
      numberOfSameCount++;
    else
      numberOfSameCount = 1;
    // Found k/2+2 buckets of the same count?
if (numberOfSameCount == kDiv2Add2)
    {
      // Merge oldest (current and previous) into current
      current.Value.Timestamp = previous.Value.Timestamp;
      current.Value.Count = previous.Value.Count + current.Value.Count;
      buckets.Remove(previous);
      // A merged bucket can cause a cascade of merges due to
      // its new count, continue iteration from merged bucket
      // otherwise the cascade might go unnoticed
      previous = current.Previous;
    }
    else
    {
      // No merge, continue iteration with next bucket 
      previous = current;
      current = current.Next;
    }
  }
}

Più formalmente, secchi hanno conti non decrescente dal primo (più recente) nel secchio (più vecchi) ultimo nell'elenco. I conteggi di benna sono vincolati alle potenze di due. Let k = 1⁄εe k⁄2 essere un valore integer, altrimenti sostituire quest'ultimo da. Fatta eccezione per contare l'ultimo secchio, let there be almeno k⁄2 e al massimo k⁄2 + 1 secchi del conteggio stesso. Ogni volta che ci sono k⁄2 + 2 secchi del conte stesso, i due più antichi sono fuse in un secchio con due volte il conteggio del secchio più antico e il più recente dei loro timestamp. Ogni volta che vengono unite due secchi, attraversamento continua dal secchio fuse. L'Unione può causare una cascata di unioni. In caso contrario l'attraversamento continua dal secchio successivo.

Per ottenere una sensazione per l'approssimazione del conte, guardare al metodo query dell'istogramma:

public long Query()
{
  long last = buckets.Last != null ?
buckets.Last.Value.Count : 0;
  return (long)Math.Ceiling(total - last / 2.0);
}

La somma dei conti fino all'ultimo secchio secchio è esatta. L'ultimo secchio deve coprire almeno un evento non scaduta, altrimenti il secchio è scaduto e rimosso. Il conteggio dovrà essere stimato perché può coprire eventi scaduti. Stimando il conteggio effettivo l'ultimo secchio come conte di mezzo l'ultimo secchio, l'errore assoluto di tale stima è non più grande di conteggio di mezzo quel secchio. Il conteggio complessivo è stimato dalla somma di tutti i conteggi secchio (totale) meno conte di mezzo l'ultimo secchio. Per garantire che l'errore assoluto è entro i limiti dell'errore relativo, influenza l'ultimo secchio deve essere abbastanza piccolo rispetto alla somma di altri conti secchio. Per fortuna, questa è assicurata dalla procedura di stampa unione.

I listati di codice e spiegazioni fino a questo punto si lascia perplesso circa il funzionamento dell'istogramma? Leggere attraverso il seguente esempio.

Si supponga di avere un istogramma appena inizializzato con finestra dimensione n = 7 e relativo errore limite superiore ε = 0,5, così k = 2. L'istogramma si sviluppa come mostrato Figura 3, e una descrizione schematica di questo istogramma è raffigurata Figura 4. In Figura 3, unioni sono al timestamp, 5, 7 e 9. È un'Unione a cascata al timestamp 9. Un secchio scaduto è al timestamp 13. Andrò più in dettaglio su questo.

Figura 3 esempio di istogramma esponenziale

 

A Schematic Overview of the Histogram Depicted in Figure 3
Figura 4 descrizione schematica dell'istogramma raffigurato in figura 3

Il primo evento non ha alcun effetto. All'evento di quinto, un'Unione dei secchi più antichi si verifica perché esistono casella di testo: k⁄2 + 2 secchi con lo stesso numero di 1. Ancora una volta, un'Unione succede al settimo evento. All'evento di nono, un merge cascate in un'altra Unione. Si noti che dopo il settimo evento, il primo evento scade. Nessun secchio trasporta un timestamp scaduto fino a quando l'evento 13. All'evento 13, il secchio con timestamp 6 non più copre almeno un evento non scaduta e quindi scade. Si noti che l'errore relativo osservato è chiaramente meno limite superiore errore relativo.

In Figura 4, una casella tratteggiata è la dimensione della finestra in quel punto; esso contiene i secchi e implica la durata degli eventi coperti. Un parallelepipedo solido è un secchio con timestamp in cima e conteggio sul fondo. Situazione A Mostra l'istogramma al timestamp 7 con frecce agli eventi conteggiati. Situazione B Mostra l'istogramma al timestamp 9. L'ultimo secchio copre eventi scaduti. Situazione C mostra l'istogramma al timestamp 13. Il secchio con timestamp 6 scaduto.

Dopo aver messo tutto insieme, ho scritto un programma di piccola dimostrazione per l'istogramma esponenziale (controllare il download del codice sorgente per questo articolo). I risultati sono mostrati nella Figura 5. Simula un flusso di eventi 100 milioni con una dimensione di finestra basata sul conteggio N di 1 milione di eventi. Ogni evento ha una portata di 0 o 1 con 50% di probabilità. Stima che il conteggio approssimativo di 1s con un limite superiore errore relativo arbitrariamente scelto ε dell'1 per cento, o l'accuratezza del 99 per cento. Il risparmio di memoria dell'istogramma è enorme rispetto a windows; il numero di bucket è molto meno rispetto al numero di eventi nella finestra. Un tasso di eventi di poche centinaia di migliaia di eventi al secondo è realizzato su una macchina con un processore dual-core Intel 2.4 GHz e 3 GB di RAM con Windows 7.

Empirical Results for the Exponential Histogram
Figura 5 risultati empirici per l'istogramma esponenziale

Una bellezza chiamata StreamInsight

Forse ti stai chiedendo che cosa è Microsoft StreamInsight e dove si inserisce nella. Questa sezione fornisce alcune nozioni di base. StreamInsight è un motore robusto, ad alte prestazioni, basso overhead, vicino a latenza zero ed estremamente flessibile per l'elaborazione su flussi. È attualmente alla versione 2.1. La versione completa richiede una licenza SQL Server , anche se è disponibile una versione trial. Ha eseguito come un servizio autonomo o integrato nel processo.

Al centro di elaborazione dati streaming è un modello con flussi temporali degli eventi. Concettualmente, è un insieme potenzialmente infinito e voluminoso di dati che arrivano nel tempo. Pensare dei prezzi di borsa, telemetria meteo, monitoraggio, potenza Click Web, traffico Internet, caselli e così via. Ogni evento nel flusso ha un'intestazione con un carico utile di dati e metadati. Nell'intestazione dell'evento, un timestamp è mantenuto al minimo. Gli eventi possono arrivare costantemente, in modo intermittente o forse in scoppi di fino a molte migliaia al secondo. Gli eventi vengono in tre sapori: Un evento può essere confinato a un punto nel tempo; essere valido per un determinato intervallo; o essere valido per un intervallo aperto (bordo). Oltre agli eventi dal flusso, un evento speciale punteggiatura viene emesso dal motore chiamato l'incremento di tempo comune (CTI). Gli eventi non possono essere inseriti nel flusso con un timestamp meno di timestamp del CTI. Eventi CTI determinano effettivamente, nella misura in cui gli eventi possono arrivare di ordine. Per fortuna, StreamInsight si prende cura di questo.

Fonti eterogenee di input ed i dispersori di flussi di output devono in qualche modo essere adattate per adattarsi a questo modello. Gli eventi nei ruscelli temporali (queryable) sono catturati in un IQStreamable <TPayload>. Payload di eventi sono concettualmente tirato dall'enumerazione o spinto da osservazione nel flusso. Quindi, dati sottostanti possono essere esposti attraverso un IEnumerable <T> / IQueryable <T> (Estensione reattiva) o IObservable <T> / IQbservable <T> (Estensione reattiva), rispettivamente, con parametri con tipo di dati esposti. Lasciano il mantenimento degli aspetti temporali al motore di elaborazione. Conversione da e per le varie interfacce è possibile.

Le fonti e pozzi appena discussi dal vivo sui confini, mentre l'effettiva elaborazione avviene all'interno delle query. Una query è un'unità di base della composizione scritta in LINQ. Continuamente elabora eventi da uno o più flussi e uscite un altro flusso. Le query sono usati per progetto, filtrare, gruppo-applicare, multicast, operare/aggregazione, eventi join, Unione e finestra. Gli operatori possono essere definiti dall'utente. Lavorano su eventi (incrementali) o su windows (non-incrementale) come arrivano.

Una nota sulla gestione delle finestre è in ordine. Windowing partiziona un flusso in sottoinsiemi finiti di eventi che potrebbero sovrapporsi tra windows consecutivi. Windowing efficacemente produce un flusso di windows, riflettuto da un IQWindowedStreamable <TPayload> in StreamInsight. Attualmente sono supportati tre tipi diversi di windowing costrutti: windows basate sul conteggio, basati su tempo e snapshot. Windows basate sul conteggio coprire la più recenti N eventi e scivolo con l'arrivo di un nuovo evento, che scade il più vecchio e inserendo il più recente. Copertura windows basati sul tempo gli eventi più recenti nel più recente inter­val del tempo e della diapositiva da qualche intervallo (anche chiamato hopping o tumbling). Windows snapshot sono guidati da inizio evento e di fine; che è, per ogni coppia di eventi più vicino inizio e fine dei tempi, viene creata una finestra. A differenza di windows basate su tempo guidato da intervalli lungo la linea temporale, indipendentemente da eventi, windows snapshot non sono fissati lungo la linea temporale.

Che solo graffi la superficie. Ulteriori informazioni sono disponibili da diverse fonti, tra cui la linea guida per lo sviluppatore (bit.ly/T7Trrx), "Una di guida galattica per autostoppisti StreamInsight 2.1 query" (bit.ly/NbybvY), esempi di CodePlex, il blog del team di StreamInsight (blogs.msdn.com/b/streaminsight) e altri.

Riassumendo

Sono le fondamenta. A questo punto, siete probabilmente chiedendo come approssimativo conteggio è portato a vita in StreamInsight. In breve, alcuni flusso di origine (temporale) di eventi point-in-time, trasportare un carico utile pari a 0 o 1, è richiesto. Esso è alimentato in una query che calcola il conteggio approssimativo di 1s sopra gli eventi più recenti di N usando l'istogramma esponenziale. La query produce alcuni stream (temporale) di point-in-time eventi — portando il conteggio approssimativo — che è alimentato in un lavello.

Cominciamo con un operatore definito dall'utente per il conteggio approssimativo. Si potrebbe essere tentati di catturare gli eventi più recenti N utilizzando il costrutto di windowing basate sul conteggio. Ripensateci! Che vuoi sfidare i benefici del risparmio di memoria dell'istogramma esponenziale. Perché? Il costrutto di forze intero windows di eventi per essere mantenuto in memoria. Non è richiesta dall'istogramma esponenziale, perché ha un equivalente concetto implicito di windowing attraverso il mantenimento dei secchi. Inoltre, avendo un operatore su windows è non-incrementale, che è, con nessuna elaborazione di eventi come arrivano, ma solo quando è disponibile una finestra (successiva). La soluzione è un operatore del flusso definito dall'utente senza esplicita windowing costruisce su query. Il codice è elencato Figura 6.

Figura 6 implementazione dell'operatore definito dall'utente Stream

[DataContract]
public class ApproximateCountUDSO : CepPointStreamOperator<bool, long>
{
  [DataMember]
  private ExponentialHistogram histogram;
  [DataMember]
  private long currentTimestamp;  // Current (discrete) timestamp
  public ApproximateCountUDSO(long n, double epsilon)
  {
    histogram = new ExponentialHistogram(n, epsilon);
  }
  public override IEnumerable<long> ProcessEvent(
    PointEvent<bool> inputEvent)
  {
    currentTimestamp++;
    histogram.Update(currentTimestamp, inputEvent.Payload);
    yield return histogram.Query();
  }
  public override bool IsEmpty
  {
    get { return false; }
  }
}

L'operatore deriva dalla CepPointStreamOperator astratta < TInputPayload, TOutputPayload >. Ha una variabile di istanza istogramma esponenziale. Notare la decorazione con gli attributi DataContract e DataMember. Questo informa StreamInsight come serializzare l'operatore — per esempio, per scopi di resilienza. L'operatore esegue l'override dell'operatore IsEmpty per indicare non è vuoto, l'operatore è stateful. Questo impedisce StreamInsight pasticciano con l'operatore quando si minimizza l'utilizzo della memoria. Il metodo ProcessEvent è il nucleo dell'operatore. Incrementa il timestamp corrente (discreto) e passa insieme con il payload di evento al metodo update dell'istogramma. L'istogramma gestisce il bucket e viene eseguita una query per il conteggio approssimativo. Assicurarsi di utilizzare la sintassi di yield return, che rende l'operatore enumerabile. Gli operatori sono generalmente avvolto in qualche metodo di estensione nascosto in una classe di utilità. Questo codice mostra come è fatto:

public static partial class Utility
{
  public static IQStreamable<long> ApproximateCount(
    this IQStreamable<bool> source, long n, double epsilon)
  {
    return source.Scan(() => new ApproximateCountUDSO(n, epsilon));
  }
}

Questo è tutto! Inserire l'operatore in una query tramite il metodo di estensione. Un po ' di codice supplementare è necessaria per dimostrare effettivamente il suo utilizzo. Qui è un flusso di origine banale:

public static partial class Utility
{
  private static Random random = new Random((int)DateTime.Now.Ticks);
  public static IEnumerable<bool> EnumeratePayloads()
  {
    while (true)  // ad infinitum
    {
      bool payload = random.NextDouble() >= 0.5;
      yield return payload;
    }
  }
}

Questo genera casuali payload di 0s e 1s. La sintassi di yield return si trasforma in una fonte enumerabile. Mettere in una classe di utilità, se si vuole.

La classe infame programma è mostrata Figura 7. Crea l'istanza del server StreamInsight incorporato nel processo. Viene creata un'istanza di cosiddetta applicazione denominata ApproximateCountDemo come un streaming elaborazione (metadati) contenitore, ad esempio, per i flussi denominati, query e così via. È definita un'origine enumerabile di eventi point-in-time, utilizzando il metodo di utilità generatrici di payload descritto in precedenza. Si trasforma in un flusso temporale di eventi point-in-time. La query viene definita con LINQ e seleziona i conteggi approssimativi operatore calcolati sopra il flusso di origine. Questo è dove il metodo di estensione per l'operatore definito dall'utente viene pratico. Esso è bootstrap con una dimensione della finestra e l'errore relativo limite superiore. Successivamente, l'output della query viene trasformato in un lavello enumerabile, stripping le proprietà temporali. Infine, il lavello è iterazione, tirando quindi attivamente gli eventi attraverso la pipeline. Eseguire il programma e godetevi la sua uscita numerica sullo schermo.

Figura 7 l'incorporamento e l'esecuzione in StreamInsight

class Program
{
  public const long N = 10000;
  public const double Epsilon = 0.05;
  static void Main(string[] args)
  {
    using (Server server = Server.Create("StreamInsight21"))
    {
      var app = server.CreateApplication("ApproximateCountDemo");
      // Define an enumerable source
      var source = app.DefineEnumerable(() =>
        Utility.EnumeratePayloads());
      // Wrap the source in a (temporal) point-in-time event stream
      // The time settings determine when CTI events
      // are generated by StreamInsight
      var sourceStream = source.ToPointStreamable(e =>
        PointEvent.CreateInsert(DateTime.Now, e),
        AdvanceTimeSettings.IncreasingStartTime);
      // Produces a stream of approximate counts
      // over the latest N events with relative error bound Epsilon
      var query =
        from e in sourceStream.ApproximateCount(N, Epsilon) select e;
      // Unwrap the query's (temporal) point-in-time
      // stream to an enumerable sink
      var sink = query.ToEnumerable<long>();
      foreach (long estimatedCount in sink)
      {
        Console.WriteLine(string.Format(
          "Enumerated Approximate count: {0}", estimatedCount));
      }
    }
  }
}

Ricapitolando brevemente, questo articolo spiega come approssimare il conteggio sopra una finestra degli eventi in spazio e tempo logaritmico con errore superiore delimitata utilizzando una struttura di dati di istogramma esponenziale. L'istogramma è incorporato in un operatore definito dall'utente di StreamInsight.

L'istogramma e l'operatore può essere esteso per supportare dimensioni variabili windows, quali windows basati sul tempo. Ciò richiede l'istogramma per sapere la finestra intervallo/intervallo piuttosto che la dimensione della finestra. Secchi sono scaduti quando loro timestamp è prima timestamp del nuovo evento meno la finestra timespan. Un'estensione per calcolare la varianza è proposto nel libro, "Mantenimento di varianza e k–Medians sopra il flusso di dati Windows," da Brian Babcock, Mayur Datar, Rajeev Motwani e Liadan O'Callaghan (stanford.io/UEUG0i). Oltre a istogrammi, altre strutture cosiddette sinossi sono descritti in letteratura. Si può pensare di campioni casuali, heavy hitters, quantili e così via.

Il codice sorgente che accompagna questo articolo è scritto in C# 4.0 con Visual Studio 2010 e richiede StreamInsight 2.1. Il codice è gratuito per uso sotto Microsoft Public License (Ms-PL). Si noti che non è stato sviluppato per scopi educativi ed è stato ottimizzato né testato per ambienti di produzione.

Michael Meijer è come software engineer presso CIMSOLUTIONS BV, dove egli fornisce servizi di consulenza e soluzioni di sviluppo software per aziende in tutta l'Olanda. I suoi interessi nell'elaborazione dei dati di flusso e di StreamInsight iniziato durante le sue ricerche presso l'Università di Twente, Enschede, Paesi Bassi, dove ha ricevuto un Master of Science degree in Computer Science–Information Systems Engineering.

Grazie ai seguenti esperti tecnici per la revisione di questo articolo: Erik Hegeman, Roman Schindlauer e Bas Stemerdink