TopK

 

Les exemples dans cette rubrique montrent l'utilisation de l'opération TopK pour classer des événements dans un flux de données de fenêtre et retourner les N premiers événements dans un ordre de classement croissant ou décroissant. TopK est spécifié via la méthode d'extension Take(), qui attend un flux de type IQOrderedWindow<T>. Ce type est généré dès la spécification d'une clause orderby. La clause orderby doit fonctionner sur les événements d'un flux IQWindowedStreamable<T>. Par conséquent, un opérateur de fenêtre d'instantané ou récurrente doit être appliqué au flux d'entrée.

Notes


Pour les flux CEPStream<T>, la méthode d'extension Take() attend un flux de type CepOrderedStream<T>. La discussion de cette section s'applique aux deux flux IQStreamable et CEPStream.

En cas de liaisons, TopK renvoie plus de N événements. Elle se comporte donc systématiquement de manière déterministe.

TopK ne fonctionne pas avec les fenêtres utilisant des nombres.

TopK est implémentée en tant qu'opérateur basé sur le temps. Par conséquent, les durées de vie des événements de sortie sont définies en fonction de la stratégie de sortie de fenêtre.

Exemples

L'exemple suivant prend les cinq premiers événements de chaque fenêtre d'instantané définie pour le flux d'entrée inputStream, puis génère un nouveau flux d'événements. Les événements dans chaque fenêtre sont ordonnés dans l'ordre croissant des valeurs dans les champs de charge utile e.f et combinés avec l'ordre décroissant des valeurs dans le champ de charge utile e.i.

// Assuming the following input event type for inputStream:   
public class MyPayload   
{  
  public int f;   
  public int i;   
}  
  
// IQStreamable example:  
var topfive = (from window in inputStream.Snapshot()  
               from b in  
                   (from e in window  
                    orderby e.f ascending, e.i descending  
                    select e).Take(5)  
               select b;  
  
// CEPStream example:  
var topfive = (from window in inputStream.Snapshot()  
               from e in window  
               orderby e.f ascending, e.i descending  
               select e).Take(5);  
  

Vous pouvez également utiliser une projection dans une clause Take, comme indiqué dans l'exemple suivant.

  
// IQStreamable example:  
var result = (from win in source  
                       .TumblingWindow(TimeSpan.FromMinutes(10))  
                   from b in   
                       (from r in win  
                        orderby r.Value descending  
                        select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new  
                                              {  
                                                  ranking = e.Rank,  
                                                  e.Payload. scaled,  
                                                  e.Payload.SourceId  
                                              })  
                   select b;  
  
// CEPStream example:  
var result = (from win in source  
                       .TumblingWindow(TimeSpan.FromMinutes(10), HoppingWindowOutputPolicy.ClipToWindowEnd)  
                   from r in win  
                   orderby r.Value descending  
                   select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new  
                                         {  
                                             ranking = e.Rank,  
                                             e.Payload. scaled,  
                                             e.Payload.SourceId  
                                         });  
  

Si le résultat réel du classement doit être projeté dans la charge utile des événements, une expression lambda correspondante est utilisée. Le classement est ensuite accessible via la propriété Rank, tandis que les champs de charge utile le sont via la propriété Payload.

  
// IQStreamable example:  
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2))  
               from b in  
                   (from e in window  
                    orderby e.f ascending, e.i descending  
                    select e).Take(3, e => new  
                    {  
                        ranking = e.Rank,  
                        f = e.Payload.f,  
                        i = e.Payload.i  
                    })  
               select b;  
  
  
// CEPStream example:  
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2), WindowOutputPolicy.ClipToWindowEnd)  
               from e in window  
               orderby e.f ascending, e.i descending  
               select e).Take(3, e => new  
               {  
                   ranking = e.Rank,  
                   f = e.Payload.f,  
                   i = e.Payload.i  
               });  
  

L'illustration suivante montre la propriété temporelle d'un résultat TopK. Cet exemple utilise une fenêtre récurrente avec TopK, afin que les deux événements ayant les valeurs de charge utile les plus élevées pour le champ de charge utile val soient choisis dans chaque fenêtre.

  
// IQStreamable example:  
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10))  
               from b in  
                   (from e in window  
                    orderby e.val descending  
                    select e).Take(2)  
               select b;  
  
  
// CEPStream example:  
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10), WindowOutputPolicy.ClipToWindowEnd)  
               from e in window  
               orderby e.val descending  
               select e).Take(2);  
  

Les zones orange représentent les fenêtres. L'illustration montre comment TopK définit les durées de vie des charges utiles sélectionnées selon la taille de la fenêtre. Ici, nous supposons des événements point dans le flux d'entrée qui affichent la valeur de leur champ de charge utile val. Notez que la stratégie de sortie donnée de ClipToWindowEnd affecte l'intervalle de temps de la fenêtre aux événements de résultat.

TopK

Il est important de comprendre le comportement de TopK pour les liaisons. Considérons l'exemple suivant, où le flux d'entrée contient des valeurs dupliquées et où les deux premières valeurs sur une fenêtre bascule sont calculées. Dans cet exemple, la stratégie de sortie par défaut de PointAlignToWindowEnd est utilisée :

Opérateur TopK avec stratégie PointAlignToWindowEnd

Les deux premières valeurs de la première fenêtre sont 3 et 2. Tous les événements pour lesquels le champ classé comporte l'une de ces valeurs sont inclus dans le résultat qui contient, dans cet exemple, trois événements.

Voir aussi

Concepts du serveur StreamInsight
Utilisation de fenêtres d'événement
Fenêtres récurrentes
Fenêtres d'instantané