Agrégats et opérateurs définis par l'utilisateur

Les exemples de cette rubrique montrent comment étendre l'ensemble des opérations de fenêtre dans des opérateurs LINQ StreamInsight à l'aide d'agrégats et d'opérateurs définis par l'utilisateur. Ces extensions sont définies sur des fenêtres d'événement et renvoient des événements de résultat ou n'en renvoient pas. Un agrégat ou un opérateur défini par l'utilisateur doit être compilé dans un assembly accessible au serveur StreamInsight, comme les adaptateurs fournis et utilisés au moment de l'exécution.

StreamInsight propose également des opérateurs de flux de données définis par l'utilisateur comme autre mécanisme d'extensibilité. Les opérateurs de flux de données définis par l'utilisateur sont définis directement sur le flux d'événements et non sur les fenêtres d'événement.

Agrégats définis par l'utilisateur

Un agrégat défini par l'utilisateur est utilisé en sus d'une spécification de fenêtre, de manière à agréger les événements de cette fenêtre et produire un résultat unique. L'entrée d'un agrégat défini par l'utilisateur est une fenêtre CEP (le résultat d'un opérateur de fenêtre récurrente, d'instantané ou utilisant des nombres) qui contient un jeu d'événements CEP et génère une valeur de retour unique (un type CLR mappé à l'un des types primitifs StreamInsight). Pour plus d'informations sur les fenêtres, consultez Utilisation de fenêtres d'événement.

Vous pouvez implémenter des agrégats définis par l'utilisateur dont le fonctionnement est plus complexe que celui des agrégats plus simples comme count, sum et average fournis par StreamInsight. Un exemple utilisant des moyennes pondérées de durée de calcul est détaillé dans une section ultérieure.

Opérateurs définis par l'utilisateur

Un opérateur défini par l'utilisateur est utilisé en sus d'une spécification de fenêtre pour traiter les événements de la fenêtre et générer un jeu d'un ou plusieurs événements. L'entrée d'un opérateur défini par l'utilisateur est une fenêtre CEP (le résultat d'un opérateur de fenêtre récurrente, d'instantané ou utilisant des nombres) qui contient un jeu d'événements CEP et génère un ensemble d'événements CEP ou un jeu de charges utiles CEP.

Vous pouvez utiliser un opérateur défini par l'utilisateur lorsque vous avez besoin d'un calcul pour générer ou impacter des événements entiers, y compris leurs horodatages, pour chaque fenêtre. Par exemple, il peut s'agir de la définition d'un champ d'état d'un événement, en plus du calcul d'une agrégation, où l'état dépend du résultat de l'agrégation et d'un autre paramètre. Ainsi, un opérateur défini par l'utilisateur peut générer un événement unique pour chaque fenêtre contenant un champ de charge utile avec le résultat de l'agrégation, ainsi qu'un champ d'état qui indique si le résultat de l'agrégation n'a pas respecté une contrainte.

Agrégats et opérateurs définis par l'utilisateur basés sur le temps

Vous pouvez baser sur le temps des agrégats et des opérateurs définis par l'utilisateur, selon la classe de base que vous avez choisie pour implémenter ces opérateurs.

Les agrégats et les opérateurs définis par l'utilisateur basés sur le temps n'attendent pas le passage d'événements entiers incluant leurs horodateurs. Au lieu de cela, ils prennent uniquement en considération un jeu d'un ou plusieurs champs de charge utile provenant des événements, dans la fenêtre définie. Par ailleurs, l'heure de début et de fin de la fenêtre en cours ne leur est pas passée.

Les agrégats et les opérateurs définis par l'utilisateur basés sur le temps reçoivent, pour chaque fenêtre, un jeu d'événements contenant leurs horodateurs et l'heure de début et de fin de la fenêtre. La classe de base respective à partir de laquelle un agrégat ou un opérateur défini par l'utilisateur dérive l'implémentation détermine si celui-ci est basé sur le temps ou non.

Implémentation d'agrégats définis par l'utilisateur

Le rôle de l'auteur d'un agrégat défini par l'utilisateur consiste à :

  • fournir l'implémentation réelle de l'agrégat défini par l'utilisateur ;

  • fournir la méthode d'extension pour LINQ, afin de permettre à un générateur de requêtes d'utiliser l'agrégat défini par l'utilisateur.

Pour implémenter un agrégat défini par l'utilisateur, l'utilisateur dérive de la classe de base appropriée : CepAggregate pour les agrégats définis par l'utilisateur qui ne sont pas basés sur le temps, ou CepTimeSensitiveAggregate pour les agrégats définis par l'utilisateur qui sont basés sur le temps.

La dérivation de classe requiert l'instanciation des paramètres de type d'entrée et de sortie. Le type d'entrée représente soit la charge utile entière (si l'agrégat défini par l'utilisateur doit être en mesure de lire le jeu entier des champs de charge utile au cours de son calcul), soit un type CLR qui mappe à un type primitif correspondant dans le système de type StreamInsight (dans le scénario où un champ singleton est l'entrée de l'agrégat défini par l'utilisateur). Dans les deux cas, le type de sortie doit être un type CLR mappé à un type primitif correspondant.

En dehors des données d'événement, une structure de configuration facultative à l'heure de début de la requête peut être passée au constructeur de la classe de l'agrégat défini par l'utilisateur, si cela est voulu par son auteur. Si ce constructeur est fourni par l'auteur de l'agrégat défini par l'utilisateur, le moteur l'appelle en conséquence pendant l'exécution, avec la configuration fournie par l'appelant de l'agrégat défini par l'utilisateur dans LINQ.

Les agrégats définis par l'utilisateur basés sur le temps et ceux non basés sur le temps reçoivent les charges utiles sous la forme d'un jeu non ordonné. Dans le cas d'un agrégat défini par l'utilisateur basé sur le temps, les horodateurs des événements sont de plus associés à chaque charge utile. Qui plus est, un descripteur de fenêtre qui définit le démarrage de la fenêtre et les heures de fin est passé à l'agrégat défini par l'utilisateur.

Exemples d'agrégats définis par l'utilisateur

L'exemple suivant implémente un agrégat défini par l'utilisateur basé sur le temps. Il attend un jeu de champs d'événement entier. La structure de configuration facultative n'est pas spécifiée pour cet exemple d'implémentation, par conséquent, la classe n'a pas besoin d'un constructeur spécifique.

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

En plus d'implémenter l'agrégat défini par l'utilisateur, vous devez fournir une méthode d'extension pour LINQ, afin de permettre au générateur de requêtes de l'utiliser. La méthode d'extension est une signature qui permet à l'auteur de la requête d'utiliser l'agrégat et de compiler la requête. Par le biais d'un attribut, le fournisseur LINQ StreamInsight peut faire référence à la classe réelle contenant l'implémentation de l'agrégat défini par l'utilisateur, comme illustré dans l'exemple suivant.

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

Ici, il doit y avoir une implémentation d'un agrégat défini par l'utilisateur via une classe Median, qui implémente un agrégat qui fonctionne sur un champ unique de type int et retourne une valeur de type « int ». L'expression dans la signature de la fonction représente le mappage depuis le type d'événement de flux d'entrée à une valeur entière unique. Notez que la méthode d'extension ne sera jamais exécutée, d'où la présence de CepUtility.DoNotCall() dans son corps. Conformément à cette spécification, l'agrégat défini par l'utilisateur peut être utilisé dans LINQ, comme illustré dans l'exemple suivant.

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

L'argument de l'expression lambda mappe la charge utile de l'événement à une valeur entière qui sera l'entrée de l'agrégat défini par l'utilisateur. Dans ce cas, la valeur médiane des valeurs val du champ d'événement sera calculée pour chaque fenêtre.

Considérez ensuite l'exemple d'un agrégat défini par l'utilisateur non basé sur le temps qui contient des informations de configuration. Il attend une charge utile entière de type Trade comme entrée et renvoie des valeurs de type double. Cet exemple inclut également la méthode d'extension correspondante :

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

Étant donné que la charge utile entière est l'entrée, aucune expression lambda n'est spécifiée par la méthode d'extension. Le seul paramètre de l'agrégat défini par l'utilisateur est la valeur de la configuration (qui est double ici) :

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

Considérez ensuite l'exemple d'un agrégat défini par l'utilisateur basé sur le temps avec les informations de configuration. L'agrégat défini par l'utilisateur est une moyenne de temps pondérée, avec des événements intervalle considérés comme une fonction intermédiaire (c'est-à-dire, chaque intervalle est valide jusqu'au prochain). De façon similaire à l'exemple précédent, il n'attend pas la charge utile entière comme entrée, mais uniquement des valeurs de type double.

Notez que, bien que les charges utiles d'événement soient réduites aux valeurs double, le jeu de données d'entrée est néanmoins défini comme un jeu d'événements intervalle et non pas comme un jeu de charges utiles (comme c'était le cas des agrégats définis par l'utilisateur non basés sur le temps). Cela est nécessaire pour inclure les horodateurs, car l'agrégat défini par l'utilisateur est spécifié comme étant basé sur le temps. De plus, la fenêtre elle-même est donnée sous la forme d'un objet WindowDescription, qui possède une propriété définissant l'heure de début et l'heure de fin. Ces horodateurs sont exprimés en heure UTC. Notez également que UdaConfig est une classe ou une structure qui doit être sérialisable via DataContractSerializer.

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

Où UDAConfig est

public class UDAConfig
{
    public double Multiplier { get; set; }
}

La méthode d'extension inclut maintenant aussi la structure de configuration suivante :

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

La configuration devient un autre paramètre dans la méthode d'extension :

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

Les exemples fournis jusqu'à présent considèrent des scénarios où l'événement est typé. Autrement dit, les types de charge utile sont déjà connus au moment de l'implémentation de l'agrégat défini par l'utilisateur. L'exemple suivant implémente un agrégat défini par l'utilisateur qui a un type d'entrée générique dans lequel le type d'entrée est passé dans l'agrégat défini par l'utilisateur uniquement pendant l'exécution.

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

Implémentation d'opérateurs définis par l'utilisateur

Le rôle de l'auteur d'un opérateur défini par l'utilisateur consiste à :

  • fournir l'implémentation réelle de l'opérateur défini par l'utilisateur ;

  • fournir la méthode d'extension pour LINQ, afin de permettre à un générateur de requêtes d'utiliser l'opérateur défini par l'utilisateur.

Pour implémenter un opérateur défini par l'utilisateur, l'utilisateur dérive de la classe de base appropriée : CepOperator pour les opérateurs définis par l'utilisateur non basés sur le temps ou CepTimeSensitiveOperator. La dérivation de classe requiert l'instanciation du paramètre de type d'entrée et de sortie. Le type d'entrée représente toujours la charge utile entière. Le type de sortie est un jeu de charges utiles ou d'événements, selon la classe de base sélectionnée.

En plus des données d'événement, vous pouvez passer au constructeur de la classe de l'opérateur défini par l'utilisateur une structure de configuration facultative à l'heure de début de la requête, si cela est voulu par son auteur. Si ce constructeur est fourni par l'auteur de l'opérateur défini par l'utilisateur, le moteur l'appelle en conséquence pendant l'exécution, avec la configuration fournie par l'appelant de l'opérateur défini par l'utilisateur dans LINQ.

Les opérateurs définis par l'utilisateur non basés sur le temps et ceux basés sur le temps reçoivent les charges utiles sous la forme d'un jeu non ordonné. Dans le cas d'un opérateur défini par l'utilisateur basé sur le temps, les horodateurs des événements sont de plus associés à chaque charge utile. Qui plus est, un descripteur de fenêtre qui définit les heures de début et de fin de la fenêtre est passé à l'opérateur défini par l'utilisateur.

Comportement des CTI dans les opérateurs définis par l'utilisateur

Les opérateurs définis par l'utilisateur modifient les CTI comme suit : si une fenêtre est encore « ouverte », autrement dit, si aucun CTI n'a été reçu avec un horodateur après l'heure de fin de la fenêtre, tous les CTI inclus dans la fenêtre sont modifiés de manière à correspondre à l'heure de début de la fenêtre. De cette façon, la sortie de l'opérateur défini par l'utilisateur, qui peut potentiellement contenir des horodateurs définis par l'utilisateur, peut changer tant que la fenêtre reste ouverte.

Exemples d'implémentations d'opérateurs définis par l'utilisateur

L'exemple suivant implémente un opérateur défini par l'utilisateur non basé sur le temps, qui ne contient pas d'informations de configuration.

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

L'exemple suivant indique comment modifier la signature pour en faire un opérateur défini par l'utilisateur basé sur le temps qui accepte les informations de configuration.

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

Exemples de méthodes d'extension pour les opérateurs définis par l'utilisateur

En plus d'implémenter l'opérateur défini par l'utilisateur, son auteur doit fournir une méthode d'extension pour LINQ, afin de permettre au générateur de requêtes de l'utiliser. La méthode d'extension est une signature qui permet à l'auteur de la requête d'utiliser l'opérateur et de compiler la requête. Par le biais d'un attribut, le fournisseur LINQ peut faire référence à la classe réelle contenant l'implémentation de l'opérateur défini par l'utilisateur, comme illustré dans l'exemple suivant.

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

Cet opérateur défini par l'utilisateur peut maintenant être utilisé de la façon suivante.

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

L'exemple suivant illustre la méthode d'extension et l'utilisation d'un opérateur défini par l'utilisateur qui a une structure de configuration, en se référant à une implémentation contenue dans une classe nommé SampleUDOwithConfig.

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

Propriétés de champ d'événement spécifiques à la culture

Les extensions telles que les opérateurs, les agrégats et les fonctions définis par l'utilisateur peuvent être considérées comme des interfaces entre le domaine CEP avec son système de type et CLR .NET. Pour certaines applications, il est souhaitable de passer les informations de culture via cette interface. Pour les agrégats et les opérateurs définis par l'utilisateur, l'auteur de l'extension peut implémenter une interface supplémentaire, IDeclareEventProperties, qui tient compte de l'inspection ou du paramétrage des propriétés culturelles sur les champs d'événement. Pour implémenter cette interface, vous devez fournir une fonction DeclareEventProperties, qui retourne un objet de type CepEventType, pouvant contenir les informations de culture pour ses champs, comme indiqué dans l'exemple suivant :

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

Cet exemple d'opérateur défini par l'utilisateur prend des événements d'entrée de type « entrée » et produit des événements de type « sortie ». Le type « sortie » a des champs de type chaîne que l'auteur de l'opérateur défini par l'utilisateur souhaite explicitement annoter avec certaines informations de culture. La culture nommée zh-CN est appliquée au champ de sortie firstName, alors que le champ de sortie location est annoté avec la même culture associée au champ loc dans le type d'événement d'entrée de l'opérateur défini par l'utilisateur. Pour chaque événement qui est produit pendant l'exécution par l'opérateur défini par l'utilisateur, ces cultures sont appliquées à ses champs avant que l'événement soit inséré dans le flux de sortie de l'opérateur défini par l'utilisateur.

La même interface existe également pour les agrégats définis par l'utilisateur. Puisque les agrégats ont une seule valeur de retour, pour appliquer les informations spécifiques à la culture à un tel champ, l'interface IDeclareEventProperties encapsule la valeur de retour dans un CepEventType avec un champ unique, pour permettre d'annoter ce champ avec les propriétés d'événement CEP spécifiques.

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

Ici, la chaîne qui représente le résultat de l'agrégat est encapsulée dans un CepEventType, afin que l'auteur de l'agrégat défini par l'utilisateur puisse définir la propriété CultureInfo sur ce champ. Ces informations de culture seront propagées au champ d'événement réel qui reçoit le résultat de l'agrégation dans la requête LINQ, où l'agrégat défini par l'utilisateur est utilisé.

Voir aussi

Concepts

Utilisation de fenêtres d'événement

Autres ressources

Écriture de modèles de requête dans LINQ