Dieser Artikel wurde maschinell übersetzt.

StreamInsight

Verarbeiten umfangreicher Datenströme mit Microsoft StreamInsight

Rob Pierry

Die Entdeckung, dass die Produktionslinie gesunken, Benutzer Medien-Streams überspringen sind oder eines Ihrer Produkte geworden ist ein "muss" ist einfach, sobald dies bereits geschehen ist.Der eigentliche Trick ist diese Szenarien identifizieren, wie sie geschehen oder sogar Vorhersagen basierend auf vergangene Trends.

Erfolgreich Vorhersagen Szenarien wie diese erfordert einen near-Real-Time-Ansatz.Bis relevante Daten extrahiert, transformiert und in einem traditionellen Business Intelligence (BI)-Lösung wie SQL Server Analysis Services (SSAS) geladen, hat die Situation längst verändert.In ähnlicher Weise tätig jedes System, das stützt sich auf ein Anforderung-Antwort-Muster anfordern aktualisierte Daten aus einem transaktionale Datenspeicher (z. B. eine SQL Server Reporting Services oder SSRS, Bericht) ist immer auf veraltete Daten am Ende dessen Anfrage-Abfrageintervall.Das Abrufintervall ist in der Regel festgelegt, so dass selbst wenn ein Platzen der interessante Aktivität auftritt, das Konsumierende System Wissen wird nicht, bis das nächste Intervall um kommt.Stattdessen sollte das Konsumierende System kontinuierlich Moment benachrichtigt werden interessante Kriterien erfüllt sind.

Wenn neue Tendenzen erkennen, sind Zeitabständen Schlüssel — 100 Einkäufe eines bestimmten Elements in den letzten fünf Minuten ist eindeutig ein größer Indikator für eine Entwicklung als ein stetiges Rinnsal in den letzten fünf Monaten.Traditionelle Systeme wie SSAS und SSRS den Entwickler zu verfolgen die Aktualität der Daten auf ihren selbst, durch gesonderte Dimension in einen Cube oder eine Zeitstempel Spalte in einem transaktionalen Speicher benötigen.Im Idealfall würde Tools zur Identifizierung von aufstrebenden Szenarien haben das Konzept der Zeit integrierte und bieten eine reiche API für das Arbeiten mit ihm.

Schließlich kommt ein guter Indikator für die Zukunft von Analyse der Vergangenheit.In der Tat, das ist was traditionellen BI ist — Aggregation und Analyse große Mengen von historischen Daten, Trends zu erkennen.Leider sind verschiedene Tools und Abfragesprachen beteiligt, wenn diese Systeme im Vergleich zu mehr Transaktionssysteme.Erfolgreich neue Szenarien identifizieren erfordert seamless Wechselbeziehung von Vergangenheit und Gegenwart Daten.Diese Art von enge Integration ist nur möglich, wenn die gleichen Tools und Abfragesprache für beide verwendet werden.

Für bestimmte Szenarien wie z. B. Produktionslinie Überwachung, hochspezifische benutzerdefinierte Tools zum Durchführen dieser Funktionen vorliegen, doch diese Tools sind oft vergleichsweise teuer und nicht allgemeine Zweck.

Die Fertigungslinie verhindern hinunter oder um sicherzustellen, dass Ihre Produkte entsprechend gehandelt werden, ist der Schlüssel zu entgegenkommend genug zu identifizieren und Änderungen der Situation anpassen.Um schnell und einfach diese Szenarien identifizieren, historische und Echtzeit-Abfragen sollten die gleichen Entwickler-Friendly Toolsets verwenden und Abfragesprachen, das System große Datenmengen in nahe-Echtzeit (in der Größenordnung von Hunderten von Tausenden von Ereignissen pro Sekunde) behandeln soll, und der Motor sollte flexibel genug, um Szenarien für eine breite Palette an Problemdomänen behandeln.

Glücklicherweise existiert dieses Tool.It's called Microsoft StreamInsight.

StreamInsight Datenübertragungsarchitektur-Übersicht

StreamInsight ist ein komplexer Ereignisverarbeitung Lage von Hunderten von Tausenden von Ereignissen pro Sekunde mit äußerst geringer Latenz.Es kann von jedem Prozess, z. B. ein Windows-Dienst gehostet oder direkt in eine Anwendung eingebettet.StreamInsight hat eine einfache Adapter-Modell zum Abrufen von Daten in und aus und Abfragen über Echtzeit- und historische Daten verwenden die gleiche LINQ-Syntax genau wie jede andere Assembly von jeder Microsoft zugegriffen.NET Framework-Sprache.Es wird als Teil von SQL Server 2008 R2 lizenziert.

Die allgemeine Architektur des StreamInsight ist ganz einfach: Ereignisse gesammelt, aus einer Vielzahl von Quellen über input-Adapter.Diese Ereignisse sind analysiert und transformiert über Abfragen und die Ergebnisse der Abfragen an andere Systeme und Personen über Output Adapter verteilt werden.Abbildung 1 zeigt diese einfachen Struktur.

High-Level Architecture of Microsoft StreamInsight

Abbildung 1 High-Level-Architektur von Microsoft StreamInsight

Komplexer Ereignisverarbeitung Systeme wie StreamInsight werden auf die gleiche Weise, die Service-orientierte Architekturen befassen sich mit Nachrichten und Datenbank-Systeme befassen sich mit Zeilen um Veranstaltungen organisiert.Ein Ereignis ist eine einfache Daten zusammen mit der Zeit an dem ist, dass die Daten relevant — wie ein Sensorwert zu einem bestimmten Zeitpunkt des Tages oder einem Börsenticker-Preis.Die Daten, die durch das Ereignis übertragen werden seine Nutzlast aufgerufen.

StreamInsight unterstützt drei Arten von Veranstaltungen.Punkt-Ereignisse sind, die sofortige und haben keine Dauer.Intervall-Ereignisse sind deren Nutzlasten für einen bestimmten Zeitraum relevant sind.Edge Ereignisse ähneln Intervall Ereignisse, dass die Dauer der Veranstaltung bei der Ankunft der Veranstaltung bekannt ist.Stattdessen die Startzeit festgelegt ist und das Ereignis hat effektiv infinite Dauer bis einen anderen Edge-Ereignis eingeht, die die Endzeit festlegt.Z. B. möglicherweise eine Tachometer Lesung ein Punkt-Ereignis, da es ändert sich ständig, aber der Preis der Milch im Supermarkt ein Edge-Ereignis werden konnten, da es für eine längere Zeit relevant sind.Wenn der Verkaufspreis von Milch (beispielsweise aufgrund einer Änderung in der Verteiler Preisfindung), die Geltungsdauer der neue Preis ändert ist nicht bekannt, so ein Ereignis Edge ist besser geeignet sind als ein Intervall ein.Später, wenn der Verteiler ihre Preise wieder aktualisiert, kann ein neues Edge-Ereignis Mütze die Dauer der vorherigen Preisänderung während einer anderen Edge-Ereignis einen neuen Preis in Zukunft festgelegt wird.

Input- und Output Adapter im StreamInsight sind ein abstraktes Beispiel des Entwurfsmusters Adapter.Das StreamInsight-Modul wird über eine eigene Darstellung von Ereignissen betrieben, aber die eigentlichen Quellen dieser Ereignisse können variieren Wild, von eine proprietäre Schnittstelle bis hin zu einer Hardware-Sensor um Statusmeldungen generiert von Anwendungen im Unternehmen.Input-Adapter verwandeln die Quellereignisse in einen Stream von Ereignissen, die der Motor versteht.

Die Ergebnisse aus StreamInsight Abfragen darstellen spezifische Business-wissen und können sehr spezialisiert.Es ist wichtig, dass diese Ergebnisse zu den am besten geeigneten Ort weitergeleitet werden.Ausgabe-Adapter können verwendet werden, um die interne Darstellung eines Ereignisses in Text gedruckt, um der-Konsole eine Meldung gesendet über Windows Communication Foundation (WCF) auf ein anderes System zur Bearbeitung, oder sogar einen Punkt in einem Diagramm in einer Windows Presentation Foundation-Anwendung zu aktivieren.Beispiel-Adapter für das Arbeiten mit Textdateien, WCF, SQL und mehr stehen unter streaminsight.codeplex.com.

StreamInsight Abfragen mit gutem Beispiel

Auf den ersten Blick StreamInsight Abfragen Abfrage Zeilen aus einer Datenbank ähnlich erscheinen, aber es gibt wichtige Unterschiede.Wenn Sie eine Datenbank, die Abfrage erstellt und ausgeführt und die Ergebnisse werden zurückgegeben.Wenn die zugrunde liegenden Daten ändern, ist nicht die Ausgabe beeinträchtigt, da die Abfrage bereits ausgeführt wurde.Datenbank-Abfrageergebnisse stellen einen Snapshot von einem Moment in der Zeit, über das Anforderung-Antwort-Paradigma.

StreamInsight sind Standing Abfragen.Neue Eingabeereignisse angekommen, kontinuierlich die Abfrage reagiert und neue Ausgabe Ereignisse werden erstellt, wenn nötig.

Die Abfragebeispiele in diesem Artikel werden von der Probenlösung verfügbar zum Download gezeichnet.Sie starten einfach, aber wachsen mehr mächtig wie Neuerungen der Abfragesprache eingeführt werden.Alle Abfragen mithilfe der gleiche Klasse für die Nutzlast.Hier ist die Definition einer einfachen Klasse mit Eigenschaften für Region und Wert:

public class EventPayload {
  public string Region { get; set; }
  public double Value { get; set; }

  public override string ToString() {
    return string.Format("{0}\t{1:F4}", Region, Value);
  }
}

Die Abfragen in der Beispielanwendung nutzen Eingabeadapter, die Daten nach dem Zufallsprinzip generiert und ein Output-Adapter, schreibt einfach jedes Ereignis in der Konsole. Der Adapter in der Beispielanwendung werden aus Gründen der Übersichtlichkeit vereinfacht.

Wenn Sie jede Abfrage ausführen möchten, kommentieren Sie die Zeile in der Datei Program.cs in der Beispiellösung, die die lokale Variable mit dem Namen "Template." die Abfrage zuweist

Hier ist eine grundlegende Abfrage, die die Ereignisse von der Value-Eigenschaft zu filtern:

var filtered =
  from i in inputStream
  where i.Value > 0.5
  select i;

Diese Abfrage sollte mit Erfahrung mit LINQ zu jedem Entwickler bekannt vorkommen. Da StreamInsight LINQ als seine Abfragesprache verwendet, sieht diese Abfrage genauso aus wie ein LINQ to SQL-Abfrage auf eine Datenbank oder eine speicherinterne Filterung von eine IList. Ereignisse vom input-Adapter angekommen, werden ihre Nutzlasten kontrolliert, und wenn der Wert der Value-Eigenschaft größer als 0,5 ist, sie sind an den Ausgabe-Adapter wo sie in der Konsole gedruckt werden.

Wenn die Anwendung ausgeführt wird, beachten Sie, dass Ereignisse ständig in der Ausgabe kommen. Dies ist ein Push-Modell. StreamInsight berechnet neue Ausgabe Ereignisse von Eingängen wie sie ankommen, anstatt ein Pull-Modell wie eine Datenbank, in dem die Anwendung regelmäßig abfragen muss die Datenquelle zu sehen, wenn neue Daten angekommen ist. Dies passt gut mit Unterstützung der IObservable verfügbar in Microsoft.NET Framework 4, die später abgedeckt werden.

Ein Push-Modell für kontinuierliche Daten anstelle von Polling ist schön, aber die wahre Leistungsfähigkeit von StreamInsight wird deutlich, wenn über Eigenschaften hinsichtlich Zeit Abfragen. Ereignisse über den input-Adapter angekommen, werden sie einen Zeitstempel gegeben. Dieses Zeitstempels kommen aus der Datenquelle selbst (angenommen, die Ereignisse historische Daten mit einer expliziten Spalte speichern die Zeit darstellen) oder die Zeit das Ereignis kam festgelegt werden können. Zeit ist, in der Tat, erste Klasse in der Abfrage Sprache des StreamInsight.

Abfragen häufig Aussehen auf standard Datenbankabfragen mit einem Zeit-Qualifizierer fest am Ende, wie z. B. "alle fünf Sekunden" oder "alle drei Sekunden über eine Zeitspanne von fünf Sekunden." Zum Beispiel, ist hier eine einfache Abfrage, die den Durchschnitt der Value-Eigenschaft alle fünf Sekunden findet:

var aggregated =
  from i in inputStream
    .TumblingWindow(TimeSpan.FromSeconds(5), 
    HoppingWindowOutputPolicy.ClipToWindowEnd)
  select new { Avg = i.Avg(p => p.Value)};

Windows Daten

Da das Konzept der Zeit eine grundlegende Notwendigkeit komplexer Ereignisverarbeitung Systeme ist, ist es wichtig, eine einfache Möglichkeit, die Uhrzeitkomponente der Abfragelogik im System arbeiten. StreamInsight verwendet das Konzept der Windows zum Gruppierungen von Zeit darstellen. Die vorherige Abfrage verwendet ein Tumbling Fenster. Wenn die Applikation läuft, die Abfrage eine einzelne Ausgabe-Ereignis zu generieren werden Sekunden alle fünf (die Größe des Fensters). Das Ausgabe-Ereignis entspricht dem Durchschnitt während der letzten fünf Sekunden. Genau wie in LINQ to SQL oder LINQ to Objects, Aggregation Methoden wie Summe und Mittelwert können Rollup Ereignisse gruppiert nach Zeit in single-Werte, oder wählen Sie verwendet werden kann um die Ausgabe in ein anderes Format zu projizieren.

Tumbling Windows sind nur ein Spezialfall eines anderen Fenster: Fenster hopping. Hopping Windows sind zu groß, aber sie haben auch eine Hop-Größe, die ihre Fenster-Größe ist nicht. Dies bedeutet, dass Windows-hopping einander überlappen kann.

Für Beispiel, ein hopping Fenster mit eine Fenstergröße von fünf Sekunden und eine Hop-Größe von drei Sekunden wird Ausgabe erzeugen alle drei Sekunden (die Hop-Größe), geben Sie die durchschnittliche über die letzten fünf Sekunden (die Fenstergröße). Es Hopfen forward drei Sekunden zu einer Zeit und ist fünf Sekunden lang. Abbildung 2 zeigt ein Ereignisstream in tumbling und hüpfen Windows gruppiert.

Tumbling and Hopping Windows

Abbildung 2 stolpern und hüpfen Windows

Beachten Sie, dass die Tumbling Fenster nicht überlappen, aber die hopping Windows können wenn die Hop-Größe kleiner als die Größe des Fensters ist. Wenn die Fenster überlappen, kann ein Ereignis in mehr als einem, wie der dritten Veranstaltung, Ende die in 1 und 2-Fenster ist. Edge-Ereignisse (die Dauer haben) können auch überlappen Fenster Grenzen und in mehr als ein Fenster, wie der vorletzten Event im Fenster Tumbling Ende.

Ein weiteres ist Fenster das Count-Fenster. Graf Windows enthalten eine bestimmte Anzahl von Veranstaltungen statt Ereignisse an einem bestimmten Punkt oder Dauer der Zeit. Eine Abfrage, um den Durchschnitt der letzten drei Ereignisse finden, die kamen würde ein Count-Fenster verwenden. Eine Strombegrenzung Count Windows ist, dass die integrierte Aggregationsmethoden wie Summe und Mittelwert nicht unterstützt werden. Stattdessen müssen Sie ein benutzerdefiniertes Aggregat erstellen. Diese einfache Prozess wird später in diesem Artikel erläutert.

Die endgültige Fenstertyp ist das Snapshot-Fenster. Snapshot Windows sind am einfachsten zu verstehen, im Zusammenhang mit Edge-Ereignisse. Jedes Mal, wenn ein Ereignis beginnt oder endet, das aktuelle Fenster abgeschlossen ist und eine neue beginnt. Abbildung 3 zeigt wie Edge Ereignisse in Snapshot Windows zusammengefasst sind. Beachten Sie, wie jedes Ereignis Grenze eine Berandung Fenster auslöst. E1 beginnt und damit auch w1. Wenn E2 beginnt, abgeschlossen ist w1 und w2 beginnt. Die nächsten Edge ist E1 Ende, das schließt w2 und w3 beginnt. Das Ergebnis ist drei Windows: w1, E1, w2 enthaltenden E1 und E2 und w3 enthaltenden E3. Sobald die Ereignisse in der Windows zusammengefasst sind, sind sie gestreckt, so dass es scheint, dass das Ereignis beginnt und, endet wenn das Fenster tut.

Snapshot Windows

Abbildung 3 Snapshot Windows

Komplexere Anfragen

Angesichts dieser verfügbaren Fenstern und grundlegenden Abfragemethoden wie Where, group by und order by-, eine Vielzahl von Abfragen möglich ist. Hier ist eine Abfrage, die die Eingabeereignisse nach Regionen gruppiert und anschließend ein hopping Fenster verwendet, um die Summe der Nutzlast Wert für jede Region für die Last-Minute-Ausgabe:

var payloadByRegion =
  from i in inputStream
  group i by i.Region into byRegion
  from c in byRegion.HoppingWindow(
    TimeSpan.FromMinutes(1),
    TimeSpan.FromSeconds(2), 
    HoppingWindowOutputPolicy.ClipToWindowEnd)
  select new { 
    Region = byRegion.Key, 
    Sum = c.Sum(p => p.Value) };

Diese Fenster verwenden eine Hop-Größe von zwei Sekunden, so dass das Modul Ausgabe Ereignisse alle zwei Sekunden sendet.

Da die Abfrageoperatoren für die IQueryable-Schnittstelle definiert sind, ist das Komponieren Abfragen möglich. Der folgende Code verwendet die vorherige Abfrage, die Summen von Region dann die Region mit den höchsten Betrag berechnen findet. Ein Snapshot-Fenster ermöglicht den Ereignisstream durch Summe sortiert werden, so dass die Take-Methode die Region mit den höchsten Betrag ergreifen kann:

var highestRegion = 
  // Uses groupBy query 
  (from i in payloadByRegion.SnapshotWindow(
    SnapshotWindowOutputPolicy.Clip)
    from sumByRegion in i
    orderby sumByRegion.Sum descending
    select sumByRegion).Take(1);

Ein häufiges Szenario ist eine Abfrage, die einen Strom von schnell bewegten bezieht sich Ereignisse (wie das Lesen von einem Sensor) langsamer verschieben oder statische Referenzdaten (wie die festen Standort des Sensors). Abfragen verwenden Joins, um dieses Ziel zu erreichen.

Die StreamInsight Join-Syntax ist das gleiche wie jede andere LINQ-Join mit einer wichtige Einschränkung: Ereignisse werden nur gemeinsam ihre Dauer überschneiden sich. Wenn Sensor 1 einen Wert auf Zeit t1 meldet, aber die Referenz-Eingangsdaten über Sensor 1 Lage nur für Zeit t2-t3 gilt, wird die Verknüpfung nicht überein. Die Verknüpfungskriterien für Dauer ist nicht explizit in der Querydefinition geschrieben; Es ist eine grundlegende Eigenschaft der StreamInsight-Engine. Wenn Sie mit statischen Daten arbeiten, ist es üblich für den input-Adapter, die Daten als Edge Ereignisse mit unendlichen Dauer wirksam zu behandeln. Auf diese Weise gelingt es alle Verknüpfungen zu der schnelllebigen Ereignisströme.

Korrelation von mehreren Ereignisströme über Verknüpfungen ist ein leistungsfähiges Konzept. Montagelinien, Öl-Produktionsanlagen oder High-Volume-Websites fehl nicht oft aufgrund von isolierten Ereignissen. Ein Stück Ausrüstung einen Temperatur-Alarm auslösen bringen nicht in der Regel die Linie; Es ist eine Kombination aus Umstände wie die Temperatur über einen längeren Zeitraum zu hoch wird, während ein bestimmtes Werkzeug schwere verwendet wird und die menschlichen Operatoren sind Verschiebungen ändern.

Ohne Joins würden die isolierten Ereignisse so viel Business-Wert nicht. Mithilfe von Verknüpfungen und StreamInsight Abfragen über historische Daten, können Benutzer isoliert Streams in sehr spezifische Überwachung Kriterien korrelieren, die dann in Echtzeit überwacht werden. Eine stehende Abfrage kann für die Situationen suchen, die zum Scheitern führen und ein Output-Ereignis, das zu einem System konnte, die weiß weitergeleitet werden, wie man nehmen das Überhitzung Stück Ausrüstung offline anstelle von warten, bis es auf der ganzen Linie bringt automatisch generieren würden.

In einem Szenario mit Einzelhandel können Ereignisse im Zusammenhang mit Umsatz von Element im Laufe der Zeit fließen in Preisgestaltung Systeme und Customer Order Geschichten damit optimale Preise pro Artikel oder welche Elemente der Benutzer vor dem Checkout vorschlagen zu fahren. Da Abfragen einfach erstellt, geändert und komponiert sind, können Sie fangen mit einfachen Szenarios und verfeinern sie im Laufe der Zeit immer mehr Wert für das Unternehmen nachgeben.

Benutzerdefinierte Aggregate

StreamInsight Schiffe mit einigen der häufigsten Aggregatfunktionen einschließlich Count, Sum und Average. Wenn diese Funktionen sind nicht genug (oder Sie über ein Fenster Count aggregieren wie bereits erwähnt müssen), unterstützt StreamInsight benutzerdefinierte Aggregatfunktionen.

Der Prozess der Erstellung einer benutzerdefinierten Aggregats umfasst zwei Schritte: die tatsächliche aggregate-Methode schreiben, dann Verfügbarmachen der LINQ-Methode über eine Erweiterungsmethode.

Im erste Schritt werden Erben von entweder CepAggregate < TInput, TOutput > Wenn das Aggregat nicht zeitabhängig oder CepTimeSensitiveAggregate < TInput, TOutput > ist Wenn es. Diese abstrakten Klassen verfügen über eine einzelne Methode implementiert werden als GenerateOutput bezeichnet. Abbildung 4 zeigt die Implementierung des EveryOtherSum Aggregats, das jedes andere Ereignis hinzufügt.

Abbildung 4 EveryOtherSum Aggregat

public class EveryOtherSum : 
  CepAggregate<double, double> {

  public override double GenerateOutput(
    IEnumerable<double> payloads) {

    var sum = default(double);
    var include = true;
    foreach (var d in payloads) {
      if (include) sum += d;
      include = !include;
    }
    return sum;
  }
}

Der zweite Schritt umfasst das Erstellen einer Erweiterungsmethode auf CepWindow <TPayload> damit Ihre Aggregat in Abfragen verwendet werden kann. Die CepUserDefinedAggregateAttribute gilt für die Erweiterungsmethode, um StreamInsight sagen, wo die Implementierung des Aggregats (in diesem Fall die Klasse im ersten Schritt erstellt) finden. Der Code für beide Schritte dieses Prozesses ist in der Datei EveryOtherSum.cs in der Beispielanwendung zum Download verfügbar.

Weitere Informationen über Adapter

Abfragen darstellen Geschäftslogik, die über Adapter die Daten fungiert. Die Beispielanwendung verwendet eine einfache Eingabe Adapter, der zufällige Daten generiert und ein Output-Adapter, der auf der Konsole ausgibt. Beide folgen einem ähnlichen Muster, welcher auch die verfügbaren Adapter auf der CodePlex-Website folgen.

StreamInsight verwendet ein Factorymuster zum Erstellen von Adaptern. Das Werk erstellt einer Konfiguration-Klasse eine Instanz des entsprechenden Adapters. In der Beispielanwendung sind die Konfigurationsklassen für die Eingabe- und Adapter ziemlich einfach. Die Ausgangskonfiguration Adapter hat ein einzelnes Feld um eine Zeichenfolge zu verwenden, wenn die Ausgabe schreiben zu halten. Die Eingabe Adapterkonfiguration hat ein Feld für die Zeit zu schlafen zwischen Generieren von zufälligen Ereignissen sowie ein weiteres Feld namens CtiFrequency.

Die Cti in CtiFrequency steht für aktuelle Zeitinkrement. StreamInsight verwendet Cti Ereignisse, um sicherzustellen, dass Ereignisse in der richtigen Reihenfolge zugestellt werden. Standardmäßig unterstützt StreamInsight Ereignisse außerhalb der Reihenfolge ankommen. Der Motor wird automatisch diese entsprechend bestellen beim sie durch die Abfragen übergeben. Es gibt jedoch eine Grenze für diese Neunummerierung.

Nehmen wir an, dass Ereignisse wirklich in beliebiger Reihenfolge ankommen könnte. Wie immer wäre es möglich, zu bestimmen, dass das früheste Ereignis gekommen und somit durch die Abfrage verschoben werden konnte? Es wäre nicht, weil das nächste Ereignis eine Zeit früher als die früheste haben könnte, die Sie bereits erhalten haben. StreamInsight verwendet Cti Ereignisse um das Modul zu signalisieren, das früher als was bereits eingegangen sind keine weitere Ereignisse eintreffen werden. CTI Ereignisse cue effektiv die Engine um die Ereignisse zu verarbeiten, die sind angekommen und anschließend ignorieren oder Anpassen einer mit Zeitstempeln, die früher als die aktuelle Uhrzeit.

Der Beispiel input Adapter generiert eine geordnete Ereignisstream, so dass es automatisch ein Cti-Ereignis nach jeder generierte Ereignis fügt um die Dinge entlang bewegt. Wenn Sie jemals Eingabeadapter schreiben und Ihr Programm keine Ausgabe erzeugt, sicherzustellen Sie, dass Ihr Adapter Einfügen von CTI-Wert, ist weil ohne sie das Modul für immer wartet.

StreamInsight Schiffe mit einer Vielzahl von Basisklassen für Adapter: eingegeben haben, nicht typisiert, zeigen, Intervall und Edge. Typisierte Adapter immer erzeugen Ereignisse mit einem bekannten Nutzlasttyp — bei Beispiel, der RandomPayload-Klasse. Nicht typisierte Adapter sind nützlich für die Ereignisquellen, die mehrere Arten von Ereignissen oder Dinge wie CSV-Dateien, in denen das Layout und Inhalt der Zeilen ist nicht im Voraus bekannt generieren können.

Der Beispiel input Adapter hat einen bekannten Nutzlasttyp und generiert Point Ereignisse, so dass es von TypedPointInputAdapter erbt <RandomPayload>. Die Basisklasse hat zwei abstrakte Methoden, die implementiert werden müssen: starten und fortsetzen. Im Beispiel kann die Start-Methode einen Timer, der in dem von der Konfiguration angegebenen Intervall ausgelöst wird. Elapsed-Ereignis des Zeitgebers wird die ProduceEvent-Methode, die das Hauptwerk des Adapters wird ausgeführt. Der Körper dieser Methode folgt einem gemeinsamen Muster.

Der Adapter überprüft zunächst, ob das Modul beendet wurde, da es zuletzt ausgeführt wurde, und dass es noch ausgeführt wird. Eine Methode in der Basisklasse zum Erstellen einer Instanz eines Point-Ereignisses aufgerufen, seine Nutzlast festgelegt ist und das Ereignis wird in den Stream in eine Warteschlange gestellt. Im Beispiel die SetRandomEventPayload Methode steht für real Adapter Logik – z. B. aus einer Datei lesen, im Gespräch mit einem Sensor oder Abfragen einer Datenbank.

Die input-Adapter-Factory ist ebenfalls einfach. Implementierung einer Schnittstelle ITypedInputAdapterFactory <RandomPayloadConfig> Da es eine Fabrik für typisierte Adapter. Der einzige Trick zu dieser Factory ist, dass es auch die ITypedDeclareAdvanceTimeProperties <RandomPayloadConfig> implementiert Schnittstelle. Diese Schnittstelle ermöglicht es die Fabrik, behandeln einfügen die CTI-Wert, wie bereits früher erläutert.

Die Beispielanwendung Output Adapter folgt fast genau demselben Muster als Eingabe. Es gibt eine Configuration-Klasse, einer Fabrik und der Ausgabe-Adapter selbst. Die Adapter-Klasse sieht viel wie die input-Adapter. Der Hauptunterschied ist, dass der Adapter Ereignisse aus der Warteschlange nicht Queueing entfernt sie. Da Cti Ereignisse Ereignisse wie die anderen sind, sie kommen an den Ausgabe-Adapter zu, und werden einfach ignoriert.

Observablen

Obwohl das Adapter-Modell ganz einfach ist, ist es ein noch einfacher Weg, um Ereignisse in und aus der Motor. Wenn Ihre Anwendung das eingebettete Bereitstellungsmodell für StreamInsight verwendet wird, können Sie sowohl IEnumerables als auch IObservables als ein- und Ausgänge für den Motor verwenden. Angesichts einer IEnumerable oder ein IObservable, können Sie einen Eingabestream erstellen, durch Aufrufen einer der bereitgestellten Erweiterungsmethoden wie ToStream, ToPointStream, ToIntervalStream oder ToEdgeStream. Dies erzeugt einen Ereignisstream, der genau die gleichen b. erstellt Eingabeadapter aussieht.

Ebenso gegeben eine Abfrage, Erweiterungsmethoden wie ToObservable/Enumerable, ToPointObservable/Enumerable, ToIntervalObservable/Enumerable oder ToEdgeObservableEnumerable die Ausgabe die Abfrage zu einer IObservable oder IEnumerable, bzw. weiterleiten wird. Diese Muster sind besonders nützlich für die Wiedergabe von historische Daten in einer Datenbank gespeichert.

Erstellen Sie mit dem Entity Framework oder LINQ to SQL eine Datenbankabfrage. Verwenden Sie die ToStream Erweiterungsmethode zum Konvertieren der Datenbank-Ergebnis in ein Ereignisstream und definieren eine StreamInsight Abfrage über es. Schließlich verwenden Sie ToEnumerable, um die StreamInsight Ergebnisse in etwas weiterzuleiten, die Sie leicht Foreach über und ausdrucken können.

Bereitstellungsmodell und andere Tools

Wenn die Observable und Enumerable Unterstützung verwenden möchten, muss StreamInsight in Ihrer Anwendung eingebettet werden. StreamInsight unterstützt ein Standalone-Modell, though. Wenn Sie installieren, sind Sie ob ein Windows-Service für das Hosten der Standardinstanz erstellt gefragt. Dann Hosten des Diensts StreamInsight, so dass mehrere Anwendungen auf dieselbe Instanz verbinden und Teilen Adapter und Abfragen kann.

Kommunikation mit einem gemeinsam genutzten Server anstelle des ein embedded umfasst einfach eine andere statische Methode auf der Server-Klasse. Rufen Sie anstelle von aufrufenden erstellen mit dem Instanznamen Connect mit eine EndpointAddress, die auf die gemeinsame Instanz verweist. Diese Bereitstellungsstrategie ist nützlich für Enterprise-Szenarien, in denen mehrere Anwendungen freigegebene Abfragen oder Adapter verbrauchen möchte.

In beiden Fällen ist es zuweilen notwendig, herauszufinden, warum die vom StreamInsight generierte Ausgabe ist nicht was es sein sollte. Das Produkt versendet mit einem Tool namens der Ereignis-Flow-Debugger zu gerade diesem Zweck. Die Verwendung des Tools ist über den Rahmen dieses Artikels hinaus, aber im Allgemeinen es erlaubt Ihnen, eine Verbindung zu Instanzen und Ablaufverfolgungsereignisse aus der Eingabe durch die Abfragen und die Ausgabe.

Eine Flexible, reaktive Tool

Flexible Bereitstellungsoptionen, ein vertraut Programmiermodell und einfach erstellbaren Adapter stellen StreamInsight eine gute Wahl für eine Vielzahl von Szenarien. Aus einer zentralen Instanz Abfragen und Korrelation von Tausenden von Sensor-Eingänge eine zweite, auf eine eingebettete Überwachung der aktuellen Instanz und historische Ereignisse innerhalb einer einzigen Anwendung, StreamInsight nutzt Entwickler-Friendly Frameworks wie LINQ in hohem Grade kundengerechte Lösungen ermöglichen.

Simple-to-create-Adapter und integrierte Unterstützung für die Konvertierung zwischen Ereignisströme und IEnumerables und IObservables machen es einfach schnell Lösungen aufstehen und laufen, so die inkrementelle Arbeit der erstellen und verfeinern Abfragen, die spezifische Business-Wissen einkapseln kann beginnen. Diese raffinierte sind, stellen diese Abfragen mehr Wert, Anwendungen und Organisationen zu identifizieren und reagieren auf interessante Szenarios bei ihrem auftreten, anstatt nach das Fenster der Gelegenheit verstrichen.

Rob Pierry ist ein principal Consultant mit Captura (capturaonline.com), eine consulting-Firma, die liefert innovative Anwendererlebnisse durch skalierbare Technologie unterstützt. Er kann erreicht werden unter rpierry+msdn@gmail.com.

Dank der folgenden technischen Experten für die Überprüfung dieses Artikels: Ramkumar Krishnan, Douglas Laudenschlager und Roman Schindlauer