Avancer le temps d'application

Les développeurs de StreamInsight doivent répondre aussi bien aux besoins des sources de données pouvant avoir des données non ordonnées, qu'à l'exigence de traiter les événements de façon très réactive. Lorsqu'on avance l'heure de l'application, on réduit la latence, mais on réduit également la fenêtre de réception des données qui arrivent en retard (c'est-à-dire la capacité de recevoir les données dans le désordre). StreamInsight offre différents moyens permettant d'analyser l'heure de l'application. Cette rubrique décrit les différents niveaux et stratégies d'avancée de l'heure d'application configurable au niveau de l'adaptateur et avec des liaisons de requête.

Présentation du modèle temporel

Le modèle temporel de StreamInsight repose uniquement sur le temps d'application et jamais sur l'heure système. Cela signifie que tous les opérateurs temporels font référence à l'horodateur des événements et jamais à l'horloge système de la machine hôte. Par conséquent, les applications doivent communiquer leur temps d'application actuel au serveur StreamInsight. Le temps d'application d'une application donnée dépend de nombreux facteurs différents dans le contexte de l'application. En définitive, il incombe au développeur d'applications de fournir le temps d'application approprié au serveur StreamInsight. Les principaux éléments à prendre en compte en ce qui concerne le temps d'application sont les suivants :

  • Sources de données

    Lorsque les sources de données communiquent des informations temporelles, ces données peuvent être utilisées pour identifier le point précis dans le temps où tous les événements de la source de données sont reçus. Ce point constitue le temps d'application actuel en ce qui concerne cette source de données. Notez que des sources de données différentes peuvent avancer à des vitesses différentes.

  • Données non ordonnées

    Certains événements de source de données n'arrivent pas toujours dans l'ordre de leurs horodateurs. Autrement dit, les données sont non ordonnées. StreamInsight peut adapter les données non ordonnées et s'assurer que les résultats ne dépendent pas de l'ordre dans lequel les événements arrivent au serveur StreamInsight. Les développeurs StreamInsight peuvent, dans une certaine mesure, avancer l'heure de l'application pour permettre aux événements non ordonnés d'arriver petit à petit pour les sources de données dont les événements arrivent en retard.

  • Dynamique des résultats

    Les requêtes StreamInsight produisent des résultats qui sont réputés exacts à l'heure de l'application actuelle. Cela signifie que les résultats sont générés par les requêtes StreamInsight au fur et à mesure qu'ils sont finalisés par la progression de l'heure de l'application globale.

Incréments de temps réel (CTI)

Pendant le traitement de la requête, l'heure de l'application est déterminée par les événements CTI (Current Time Increment). Un CTI est un événement de ponctuation qui est un composant central du modèle temporel StreamInsight. Les événements CTI sont utilisés pour valider des séquences d'événements et transmettre les résultats calculés à la sortie de la requête en déclarant au serveur StreamInsight que certaines parties de la chronologie ne changeront plus. Par conséquent, il est essentiel de mettre en file d'attente les événements CTI dans le flux d'événements d'entrée pour produire des résultats et vider l'état des opérateurs.

En mettant en file d'attente un événement CTI, l'entrée s'engage à ne pas produire d'événement suivant qui influencerait la période avant l'horodateur du CTI. Cela implique que, une fois qu'un événement CTI a été mis en file d'attente dans l'entrée :

  • Pour les événements de début de forme Point, Intervalle ou Bord : l'heure de début de l'événement doit se situer en même temps que ou après l'événement CTI.

  • Pour les événements de fin de forme Bord : l'heure de fin de l'événement doit se situer en même temps que ou après l'événement CTI.

Si ces règles ne sont pas respectées, il s'agit d'une violation CTI. Nous décrivons ci-dessous comment ces violations sont gérées.

Il existe trois méthodes pour insérer des événements CTI dans un flux d'entrée.

  1. Mettez en file d'attente les événements CTI par programme, via l'adaptateur d'entrée, comme pour les événements de mise en file d'attente.

  2. Générez les événements CTI de façon déclarative, avec une fréquence donnée. Cela peut être spécifié via AdvanceTimeGenerationSettings dans la fabrique d'adaptateurs ou dans le cadre de la liaison de la requête.

  3. Définissez un flux d'entrée séparé comme une source CTI. Cela peut être spécifié uniquement dans la liaison de la requête.

Si les méthodes 2 et 3 sont implémentées, une stratégie des violations CTI doit être également implémentée. La section suivante décrit AdvanceTimeGenerationSettings et les stratégies de violation. Les sections qui suivent décrivent comment utiliser les paramètres de temps préalable dans la fabrique d'adaptateurs ainsi que dans la liaison de la requête.

Génération d'événements CTI

La génération d'événements CTI (telle que décrite plus haut dans les méthodes 2 et 3) a deux aspects :

  1. La fréquence de génération, spécifiée comme un entier positif N ou comme un intervalle de temps T. La stratégie de fréquence de génération insère un CTI après l'occurrence du nombre d'événements (N) ou l'intervalle de temps (T).

  2. L'horodateur des événements CTI générés, spécifié comme un intervalle de temps par rapport au dernier événement reçu.

En outre, vous pouvez utiliser un indicateur booléen pour indiquer si un événement CTI final avec un horodateur d'infini positif doit être inséré lorsque la requête est arrêtée. Cela permet de vider tous les événements restants des opérateurs de la requête.

La génération d'événements CTI est définie via la classe AdvanceTimeGenerationSettings, dont le constructeur prend la fréquence, l'intervalle de temps et l'indicateur, comme indiqué dans l'exemple suivant.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

Cet exemple indique au moteur d'insérer un CTI tous les 10 événements qui arrivent de la source. Le CTI inclut un horodateur de l'heure du dernier événement moins cinq (5) secondes. Ce mécanisme permet de définir une « période de grâce » pour que la source de l'événement conserve la mise en file d'attente des événements tardifs sans violer la sémantique CTI (tant que le retard des événements ne dépasse pas cinq secondes). Lorsque la requête correspondante est arrêtée, un événement CTI avec un délai d'expiration infini est mis en file d'attente.

Notez que lors de la spécification d'une fréquence pour la génération d'événements CTI via AdvanceTimeSettings, les sessions de fin ne sont pas prises en compte. Elles ne sont pas non plus prises en compte lors de l'utilisation d'une durée comme fréquence. Seules les sessions de début sont prises en considération dans le cas d'événements session, que ce soit pour la fréquence ou la durée.

Stratégies de violation CTI

Il est possible qu'une source d'événement viole la sémantique CTI en envoyant des événements avec un horodateur antérieur aux événements CTI insérés. Les paramètres de temps préalable autorisent la spécification d'une stratégie pour gérer de telles occurrences. La stratégie peut prendre l'une des deux formes suivantes :

  • Suppression

    Les événements qui violent l'événement CTI inséré sont supprimés et ne sont pas mis en file d'attente dans la requête.

  • Ajustement

    Les événements qui violent l'événement CTI inséré sont modifiés si leur durée de vie chevauche l'horodateur CTI. En d'autres termes, l'horodateur de début des événements a pour valeur l'horodateur CTI le plus récent, de façon à ce que ces événements deviennent valides. Si l'heure de début et de fin d'un événement tombent avant l'horodateur CTI, alors, l'événement est supprimé.

Paramètres de temps préalable d'adaptateur

Les paramètres pour avancer le temps d'application peuvent être spécifiés dans la définition de la fabrique d'adaptateurs. De la même façon que la méthode Create() de la fabrique est appelée lorsqu'un adaptateur est instancié, une méthode correspondante permettant de définir les paramètres de temps préalable de l'instance d'adaptateur est appelée. Pour ce faire, utilisez l'interface ITypedDeclareAdvanceTimeProperties pour un adaptateur typé (ou IDeclareAdvanceTimeProperties pour un adaptateur non typé), comme illustré dans l'exemple suivant.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

Cette interface requiert l'implémentation de la méthode suivante dans le cadre de la fabrique.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

La méthode DeclareAdvanceTimeProperties() est appelée pour chaque nouvel adaptateur instancié avec la même structure de configuration et le même paramètre de forme d'événement spécifié dans l'appel de la méthode Create() correspondante. Cela permet à l'auteur de l'adaptateur de dériver les paramètres de génération CTI corrects des informations de configuration, sans que le générateur de requêtes et le module de liaison soient informés des caractéristiques des paramètres de temps préalable.

Le constructeur AdapterAdvanceTimeSettings exige l'objet AdvanceTimeGenerationSettings et la stratégie de violation décrite précédemment.

Génération CTI dans la liaison de la requête

De façon similaire à AdapterAdvanceTimeSettings, l'émission d'événements CTI peut être spécifiée de façon déclarative dans la liaison de la requête comme illustré dans l'exemple suivant. Cela autorise l'utilisateur qui lie la requête à définir le comportement du temps d'application CTI indépendamment de l'implémentation de l'adaptateur.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

Le constructeur AdvanceTimeSettings prend les trois arguments suivants :

  1. Un objet AdvanceTimeGenerationSettings

  2. Un objet AdvanceTimeImportSettings

  3. La stratégie de violation

Notez que vous pouvez définir sur « Null » les arguments des paramètres de génération ou les arguments des paramètres d'importation, mais pas les deux. En revanche, ils peuvent être spécifiés ensemble. La section suivante présente la classe AdvanceTimeImportSettings.

L'exemple ci-dessus spécifie la génération et l'insertion d'un événement CTI avec chaque événement, avec l'horodateur de l'événement (aucun intervalle). L'objet AdvanceTimeSettings peut être passé en tant que dernier paramètre facultatif à la méthode CepStream.Create() comme indiqué dans l'exemple suivant.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

Il peut également être utilisé dans le modèle de développement du module de liaison de requête :

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

Synchronisation avec un autre flux de données

Lorsque les événements CTI sont utilisés pendant la liaison de la requête, en plus (ou à la place) de la génération d'événements CTI selon une fréquence, ils peuvent être copiés d'un autre flux d'entrée vers la requête, à l'aide de AdvanceTimeImportSettings. Cette fonctionnalité permet la synchronisation de deux flux de données comme indiqué dans l'exemple suivant.

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

Cet exemple montre un cas d'usage type dans lequel un flux de données « rapide » doit être joint à un flux de données de référence « lent ». Le flux de données lent peut contenir des données de recherche qui changent beaucoup moins fréquemment que celles du flux de données rapide. Pour rendre la sortie jointe aussi rapide que l'est son entrée la plus rapide, le flux d'entrée lent est synchronisé au flux de données rapide en important ses CTI. Dans cet exemple, on considère que la gestion du temps d'application du flux de données rapide a lieu dans l'adaptateur.

Dynamique des résultats

Le paramètre d'intervalle de temps des paramètres de génération du temps préalable spécifie l'horodateur des CTI insérés. Il est important de comprendre la sémantique précise des CTI dans l'infrastructure StreamInsight afin d'obtenir l'effet désiré pour la dynamique de la sortie. Un CTI déclare au moteur que, dans la chronologie, tout ce qui se trouve strictement avant l'horodateur CTI est validé. Les implications sont différentes pour la vivacité du résultat.

Par exemple, considérez un flux d'entrée d'événements point et un paramètre de génération CTI de fréquence 1 (à chaque événement) et d'intervalle 0. Cela produit des CTI ayant exactement le même horodateur que chaque événement point. Toutefois, cela signifie que le tout dernier événement point sera validé seulement avec le CTI suivant, parce que son horodateur n'est pas strictement avant le CTI correspondant. Pour valider chaque événement point dès qu'il est émis par l'adaptateur, les CTI doivent être horodatés immédiatement après les événements point. Cela se traduit par un intervalle négatif d'un cycle, comme indiqué dans l'exemple suivant.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

Événements CTI et opérateurs de requête

Les événements CTI sont placés en file d'attente par l'adaptateur d'entrée ou injectés comme cela est décrit ci-dessus. Ils sont propagés via la requête et traités différemment par certains opérateurs. Par exemple, les opérateurs de jointure diffusent leurs résultats, de part et d'autre, jusqu'à l'événement CTI le plus ancien. Les opérateurs d'union diffusent leurs résultats, de part et d'autre, par rapport au plus âgé des événements CTI les plus récents. La requête entière diffusera uniquement son résultat jusqu'à l'événement CTI le plus récent.

Par ailleurs, certains opérateurs ont une incidence sur les horodateurs CTI. Les fenêtres récurrentes ramènent les événements CTI au début de la fenêtre, car le résultat de l'opération en sus de la fenêtre peut changer pendant que les événements continuent à arriver dans cette fenêtre. Les méthodes ShiftEventTime() et AlterEventLifeTime() ont pour effet de modifier toutes les deux l'heure de début des événements, et la même transformation sera appliquée à tous les événements CTI.

Voir aussi

Concepts

Création d'adaptateurs d'entrée et de sortie

Concepts du serveur StreamInsight

Historique des modifications

Mise à jour du contenu

Ajout de la section « Événements CTI et opérateurs de requête ».

Informations ajoutées dans la section « Génération d'événements CTI » pour signaler que les sessions de fin ne sont pas prises en compte lors de la spécification d'une fréquence CTI via AdvanceTimeSettings.