Exporter (0) Imprimer
Développer tout
Ce sujet n'a pas encore été évalué - Évaluez ce sujet

Meilleures pratiques pour optimiser la rentabilité et l'évolutivité des solutions de messagerie basée sur les files d'attente sur Windows Azure

Rédigé par : Valery Mizonov

Révisé par : Brad Calder, Sidney Higa, Christian Martinez, Steve Marx, Curt Peterson, Paolo Salvatori et Trace Young

Cet article présente des descriptions normatives d'aide et les meilleures pratiques pour créer des solutions de messagerie basée sur les files d'attente évolutives, très efficaces et rentables sur la plateforme Windows Azure. Le public visé par cet article comprend les architectes et les développeurs qui conçoivent et mettent en œuvre des solutions basées sur le cloud exploitant la plateforme de services de stockage de files d'attenteWindows Azure.

Résumé

Une solution de messagerie basée sur les files d'attente traditionnelle repose sur un emplacement de stockage des messages appelé « file d'attente de messages », qui n'est autre qu'un référentiel pour les données envoyées ou reçues depuis un ou plusieurs participants, généralement au moyen d'un mécanisme de communication asynchrone.

L'échange des données basé sur les files d'attente est le socle d'une architecture de messagerie fiable et très évolutive, capable de prendre en charge de nombreux scénarios dans l'environnement informatique distribué. Qu'il s'agisse d'une messagerie de distribution de travaux de grande taille ou d'une messagerie durable, une technologie de mise en file d'attente de messages peut contribuer à fournir des fonctions de premier ordre pour répondre aux différentes conditions requises pour la communication asynchrone d'échelle.

L'objectif de cet article est d'expliquer de quelle façon les développeurs peuvent tirer parti de certains modèles de conception, conjointement aux fonctions fournies par la plateforme Windows Azure, pour créer des solutions de messagerie basées sur les files d'attente optimisées et rentables. L'article examine de façon détaillée les approches les plus communément utilisées pour implémenter des interactions basées sur les files d'attente dans les solutions Windows Azure, et fournit des recommandations pour améliorer les performances et l'évolutivité, ainsi que pour réduire les charges d'exploitation.

Les paragraphes sous-jacents traitent des meilleures pratiques applicables, et fournissent des conseils et des recommandations le cas échéant. Le scénario décrit dans cet article est une implémentation technique basée sur un projet client réel.

Scénario du client

Afin de fournir un exemple concret, nous allons généraliser un scénario tiré d'un cas client réel, comme suit.

Un fournisseur de solutions SaaS lance un nouveau système de facturation implémenté en tant qu'application Windows Azure pour répondre aux besoins de traitement des transactions d'échelle du client. L'atout principal de la solution est sa capacité à réduire la charge de travail sur le cloud et à exploiter l'élasticité de l'infrastructure Windows Azure pour effectuer des opérations qui sollicitent beaucoup de ressources.

L'élément sur site de l'architecture de bout en bout consolide et distribue de grands volumes de transactions à un service hébergé sur Windows Azure, régulièrement pendant la journée. Les volumes varient de quelques milliers à des centaines de milliers de transactions par soumission, atteignant des millions de transactions par jour. En outre, la solution doit satisfaire des besoins opérationnels dictés par un contrat de niveau de service pour une latence de traitement maximale garantie.

L'architecture de la solution est fondée sur un modèle de conception MapReduce et comprend une couche sur le cloud basée sur des rôles de travail et sur plusieurs instances, utilisant le stockage de files d'attentes Windows Azure pour la distribution du travail. Les lots de transactions sont reçus par instance de rôle de travail initiateur du processus, décomposés (mis hors lot) en éléments de travail plus petits et mis en file d'attente dans une collection de files d'attente Windows Azure pour permettre la répartition des charges.

Le traitement de la charge de travail est géré par plusieurs instances du rôle de travail chargé du processus, qui extrait des éléments de travail des files d'attente et les traite par des procédures de calcul. Les instances de traitement utilisent des écouteurs multithread pour implémenter le traitement des données en parallèle afin d'optimiser les performances.

Les éléments de travail traités sont routés dans une file d'attente dédiée, puis ils sont extraits de celle-ci par l'instance de rôle de travail du contrôleur du processus ; ils sont ensuite agrégés et rendus persistants dans une banque de données pour l'exploration, la création de rapports et d'analyse.

L'architecture de la solution peut être représentée comme suit :

Meilleures pratiques - Solutions de messagerie - Azure1

Le diagramme ci-dessus représente une architecture standard pour la mise à l'échelle de charges de travail conséquentes ou complexes. Le modèle d'échange de messages basé sur les files d'attente adopté par cette architecture est également très courant pour de nombreuses autres applications et services Windows Azure qui doivent communiquer entre eux via des files d'attente. Il permet d'adopter une approche canonique pour analyser les composants fondamentaux spécifiques impliqués dans un échange de messages basé sur les files d'attente.

Aspects fondamentaux de la messagerie basée sur des files d'attente

Une solution de messagerie classique qui échange des données entre ses composants distribués en utilisant des files d'attente de messages inclut des éditeurs déposant des messages dans les files d'attente, et un ou plusieurs abonnés destinés à recevoir les messages. Dans la plupart des cas, les abonnés, parfois appelés écouteurs de la file d'attente, sont implémentés comme des processus simples ou multithread, s'exécutant en continu ou à la demande, conformément à un modèle de planification.

À un niveau supérieur, il existe deux mécanismes d'expédition principaux utilisés pour permettre à un écouteur de file d'attente de recevoir les messages stockés dans une file d'attente :

  • Interrogation (modèle de transmission de type pull) : un écouteur contrôle une file d'attente en vérifiant à intervalles réguliers la présence de nouveaux messages. Lorsque la file d'attente est vide, l'écouteur continue de l'interroger, tout en passant périodiquement à un état de veille.

  • Déclenchement (modèle transmission de type push) : un écouteur s'abonne à un événement déclenché (par l'éditeur lui-même ou par un gestionnaire de service de file d'attente) chaque fois qu'un message arrive dans la file d'attente. L'écouteur peut à son tour initier le traitement des messages ; il n'a pas besoin alors d'interroger la file d'attente afin de déterminer si un nouveau travail est disponible.

Il est également intéressant de mentionner qu'il existe différentes versions de ces deux mécanismes. Par exemple, l'interrogation peut être bloquée et non bloquée. Le blocage suspend une demande jusqu'à ce qu'un nouveau message s'affiche dans une file d'attente (ou jusqu'à l'expiration du délai d'attente), alors qu'une demande non bloquée est traitée immédiatement s'il n'y a rien sur une file d'attente. Avec un modèle de déclenchement, une notification peut être envoyée aux écouteurs de la file d'attente soit pour chaque nouveau message, uniquement lorsque le tout premier message arrive dans une file d'attente vide, soit lorsque la profondeur de la file d'attente atteint un certain niveau.

noteRemarque
Les opérations de retrait de la file d'attente prises en charge par l'API du service File d'attente de Windows Azure sont non bloquées. Cela signifie que les méthodes de l'API telle que GetMessage ou GetMessages sont traitées immédiatement en l'absence de tout message sur une file d'attente. En revanche, les files d'attente du Service Bus de Windows Azure permettent le blocage des opérations de réception : le thread appelant est bloqué jusqu'à ce qu'un message arrive sur une file d'attente ou que le délai d'attente spécifié se soit écoulé.

L'approche communément adoptée aujourd'hui pour implémenter des écouteurs de file d'attente dans des solutions Windows Azure peut être résumée comme suit :

  1. Un écouteur est implémenté comme un composant d'application qui est instancié et exécuté dans le cadre d'une instance de rôle de travail.

  2. Le cycle de vie du composant d'écoute de file d'attente est souvent lié à l'exécution de l'instance de rôle hôte.

  3. La logique de traitement principale repose sur une boucle dans laquelle les messages sont enlevés de la file d'attente et distribués dans la file d'attente pour le traitement.

  4. Si aucun message n'est reçu, le thread d'écoute passe à l'état de veille pour une durée qui est souvent pilotée par un algorithme de temporisation spécifique propre à l'application.

  5. La boucle de réception est exécutée et une file d'attente est interrogée jusqu'à ce que l'écouteur soit notifié de quitter la boucle et de mettre fin à l'opération.

Le diagramme suivant illustre la logique généralement utilisée pour implémenter un écouteur de file d'attente avec un mécanisme d'interrogation dans les applications Windows Azure :

Meilleures pratiques - Solutions de messagerie - Azure2
noteRemarque
Pour les besoins de cet article, les modèles de conception plus complexes, par exemple ceux qui nécessitent l'utilisation d'un gestionnaire de files d'attente (broker) ne sont pas considérés.

L'utilisation d'un écouteur de file d'attente classique avec un mécanisme d'interrogation peut ne pas être le choix optimal si vous utilisez les files d'attente Windows Azure, car le modèle de tarification de Windows Azure mesure les transactions de stockage en termes de demandes d'application exécutées sur la file d'attente, que la file d'attente soit vide ou non. Les sections suivantes traitent des techniques permettant d'optimiser les performances et de réduire le coût des solutions de messagerie basée sur les files d'attente de la plateforme Windows Azure.

Meilleures pratiques pour optimiser les performances, l'évolutivité et le coût

Dans cette section, nous allons améliorer la conception pour obtenir de meilleures performances, une meilleure évolutivité et une meilleure rentabilité.

Pour rendre un modèle d'implémentation « plus efficace », la conception devrait avoir les objectifs suivants :

  • Réduire les dépenses opérationnelles en supprimant une partie significative des transactions de stockage qui n'aboutissent à aucun travail exploitable.

  • Éliminer la latence excessive imposée par l'intervalle d'interrogation lorsqu'on vérifie les nouveaux messages d'une file d'attente.

  • Monter et réduire la charge de façon dynamique en adaptant la puissance de traitement aux volumes de travail volatils.

De plus, le modèle d'implémentation doit atteindre ces objectifs sans introduire une complexité qui annulerait les avantages associés.

Meilleures pratiques pour optimiser le coût des transactions de stockage

Lors de l'évaluation du coût total de possession (TCO) et du retour sur investissement (ROI) pour une solution déployée sur la plateforme Windows Azure, le volume des transactions de stockage est l'une des variables clés de l'équation TCO. En réduisant le nombre de transactions dans les files d'attente Windows Azure, vous réduisez les coûts d'exploitation liés aux solutions exécutées sur Windows Azure.

Dans le contexte d'une solution de messagerie basée sur les files d'attente, le volume des transactions de stockage peut être réduit en associant les méthodes suivantes :

  1. Lorsque vous placez des messages dans une file d'attente, groupez les messages connexes dans un seul grand traitement, puis compressez et stockez l'image compressée dans un stockage d'objets blob et utilisez la file d'attente pour conserver une référence à l'objet blob contenant les données réelles.

  2. Lorsque vous récupérez des messages d'une file d'attente, traitez plusieurs messages par lots dans une seule transaction de stockage. La méthode GetMessages de l'API du service File d'attente permet d'extraire le nombre de messages spécifié de la file d'attente dans une transaction unique (consultez la remarque ci-dessous).

  3. Lorsque vous vérifiez la présence d'éléments de travail dans une file d'attente, évitez d'utiliser des intervalles d'interrogation trop fréquents et appliquez un délai de temporisation pour augmenter l'intervalle entre les demandes si une file d'attente demeure toujours vide.

  4. Réduisez le nombre d'écouteurs de la file d'attente si vous utilisez un modèle de transmission de type pull. Utilisez uniquement 1 écouteur de file d'attente par instance de rôle lorsqu'une file d'attente est vide. Pour réduire à zéro le nombre d'écouteurs de file d'attente par instance de rôle, utilisez un mécanisme de notification pour instancier les écouteurs de file d'attente lorsque la file d'attente reçoit des éléments de travail.

  5. Si les files d'attente restent le plus souvent vides, réduisez automatiquement le nombre d'instances de rôle et continuez à surveiller les mesures appropriées du système pour déterminer si et quand l'application doit augmenter le nombre d'instances pour gérer une charge de travail accrue.

La plupart des recommandations ci-dessus peuvent servir pour une mise en œuvre de type standard, qui gère les lots de message et encapsule plusieurs opérations sous-jacentes de stockage de files d'attente et d'objets blob et de gestion de threads. Nous aborderons la procédure à suivre plus loin dans cet article.

ImportantImportant
Lors de la récupération des messages via la méthode GetMessages, la taille de lot maximale prise en charge par l'API du service File d'attente dans une seule opération de retrait de la file d'attente est limitée à 32.

De façon générale, le coût des transactions Windows Azure augmente linéairement à mesure que le nombre de clients du service de la file d'attente augmente, comme lors d'une montée en charge du nombre d'instances de rôle ou lorsqu'on augmente le nombre de threads de retrait de la file d'attente. Pour illustrer l'impact financier potentiel d'une conception qui ne tire pas parti ou ne suit pas les recommandations ci-dessus, voici un exemple attesté par des chiffres réels.

L'impact financier d'une conception inefficace

Si l'architecte de solutions n'implémente pas les optimisations appropriées, l'architecture décrite ci-dessus va probablement générer des dépenses opérationnelles excessives une fois la solution déployée et exécutée sur la plateforme Windows Azure. Les raisons de ces dépenses excessives sont décrites dans cette section.

Comme souligné dans la définition du scénario, les données des transactions commerciales arrivent à intervalles réguliers. Toutefois, supposons que la solution est occupée à traiter la charge de travail 25% du temps pendant un jour ouvré de 8 heures standard. Il en résulte une « durée d'inactivité » de 6 heures (8 heures x 75 %) pendant laquelle aucune transaction ne passe par le système. En outre, la solution ne reçoit aucune donnée pendant les 16 heures non ouvrées de la journée.

Cela fait un total de 22 heures d'inactivité, pendant lesquelles la solution tente toujours d'extraire de la file d'attente le travail car elle n'a aucune connaissance explicite de l'arrivée de nouvelles données. Dans cette fenêtre temporelle, chaque thread de retrait de la file d'attente individuel exécute jusqu'à 79 200 transactions (22 heures x 60 min x 60 transactions/min) sur une file d'attente d'entrée, compte tenu de la fréquence d'interrogation par défaut d'1 seconde.

Comme indiqué précédemment, le modèle de tarification de la plateforme Windows Azure est basé sur des « transactions de stockage » individuelles. Une transaction de stockage est une demande faite par une application utilisateur afin d'ajouter, consulter, mettre à jour ou supprimer des données dans le stockage. Au moment de la rédaction de ce livre blanc, les transactions de stockage sont facturées 0,01 dollars les 10 000 transactions (hors offres promotionnelles ou accords de tarification spéciaux).

ImportantImportant
Lorsque vous calculez le nombre de transactions de file d'attente, gardez à l'esprit que la mise en file d'attente d'un seul message est comptée comme 1 transaction, et que la consommation d'un message est souvent un processus impliquant 2 étapes : la récupération, suivie d'une demande pour supprimer le message de la file d'attente. Par conséquent, une opération de retrait de la file d'attente réussie génère deux transactions de stockage. Notez que même si une demande de retrait de la file d'attente ne génère aucune récupération de données, elle compte toujours comme une transaction facturable.

Les transactions de stockage générées par un seul thread de retrait de la file d'attente dans le scénario ci-dessus coûteront environ 2,38 dollars (79 200 / 10 000 x $0,01 x 30 jours) tous les mois. En comparaison, 200 threads de retrait de la file d'attente (ou, alternativement, 1 thread de retrait de la file d'attente dans 200 instances de rôle de travail) coûteront donc 457,20 dollars par mois. Il s'agit du coût induit lorsque la solution n'effectue aucun calcul, se contentant d'interroger les files d'attente pour déterminer si des éléments de travail sont disponibles. L'exemple ci-dessus est un exemple abstrait, car personne n'implémenterait son service de cette manière, mais il a le mérite d'expliquer pourquoi il est si important d'appliquer les optimisations décrites à la suite.

Meilleures pratiques pour réduire la latence excessive

Pour optimiser les performances des solutions de messagerie basée sur les files d'attente Windows Azure, une approche consiste à utiliser la couche de messagerie de publication/abonnement fournie avec le Service Bus de Windows Azure, comme décrit dans cette section.

Dans cette approche, les développeurs devront se concentrer sur la création d'une combinaison de notifications d'interrogation et de type push en temps réel, permettant aux écouteurs de s'abonner à une notification d'événement (déclencheur) déclenchée à certaines conditions pour indiquer qu'une nouvelle charge de travail est mise dans une file d'attente. Cela améliore la boucle d'interrogation de file d'attente traditionnelle grâce à une couche de messagerie de publication/abonnement pour distribuer les notifications.

Dans un système distribué complexe, cette approche exige l'utilisation d'un « bus de messages » ou d'un « intergiciel orienté messages » pour s'assurer que les notifications peuvent être relayées de manière fiable à un ou plusieurs abonnés de façon faiblement couplée. Service Bus de Windows Azure est le choix privilégié pour gérer les spécifications de messagerie entre des services distribués faiblement couplés s'exécutant sur Windows Azure et sur site. Il s'agit également de la solution idéale pour une architecture de « bus de messages » qui permet l'échange des notifications entre les processus intervenant dans les communications basées sur une file d'attente.

Les processus impliqués dans un échange de messages basé sur une file d'attente peuvent utiliser le modèle suivant :

Meilleures pratiques - Solutions de messagerie - Azure3

Plus précisément, en ce qui concerne l'interaction entre les éditeurs du service de file d'attente et les abonnés, les mêmes principes appliqués à la communication entre les instances de rôle Windows Azure peuvent répondre à la plupart des besoins d'échange de messages de notification de type push. Nous avons déjà abordé ces aspects fondamentaux dans la procédure Simplifier et mettre à l'échelle les communications entre les rôles avec Service Bus de Windows Azure.

ImportantImportant
L'utilisation de Service Bus de Windows Azure est assujettie à un modèle de tarification qui tient compte du volume des opérations de messagerie sur une entité de messagerie Service Bus comme une file d'attente ou un sujet.

Il est donc important d'effectuer une analyse du coût/bénéfice pour évaluer les avantages et les inconvénients de l'introduction de Service Bus dans une architecture spécifique. Dans ce document, il est utile d'évaluer si l'introduction de la couche de distribution de notification basée sur Service Bus aboutit réellement à une réduction du coût pouvant justifier les investissements et les efforts de développement supplémentaires.

Pour plus d'informations sur le modèle de tarification de Service Bus, consultez les sections appropriées des Questions fréquentes sur la plateforme Windows Azure.

Alors que l'impact sur la latence est assez simple à gérer avec une couche de messagerie de publication/abonnement, la réduction des coûts peut être atteinte en utilisant une mise à l'échelle dynamique (élastique), comme expliqué dans la section suivante.

Meilleures pratiques pour la mise à l'échelle dynamique

La plateforme Windows Azure permet aux utilisateurs de mettre à l'échelle leur solution plus rapidement et plus facilement que jamais. La capacité de gérer des charges de travail volatiles et un trafic variable est l'une des propositions phares de la plateforme du cloud. Cela met l'extensibilité à la portée de tout le monde, et en fait une fonctionnalité prête à l'emploi qui peut être activée par programme à la demande dans une solution dans le cloud bien conçue.

La mise à l'échelle dynamique est la capacité technique d'une solution donnée de gérer des charges de travail variables en augmentant et en réduisant la capacité de travail et la puissance de traitement au moment de l'exécution. La plateforme Windows Azure prend en charge en mode natif la mise à l'échelle dynamique via la configuration d'une infrastructure de traitement distribué dans laquelle les heures de calcul peuvent être achetées en fonction des besoins.

Il est important de distinguer 2 types de mise à l'échelle dynamique sur la plateforme Windows Azure :

  • La mise à l'échelle de l'instance de rôle fait référence à l'ajout et à la suppression d'instances de rôle Web ou de rôle de travail supplémentaires pour gérer la charge de travail dans le temps. Cela implique souvent de modifier le nombre d'instances dans la configuration du service. L'augmentation du nombre d'instances entraîne le démarrage par le runtime de Windows Azure de nouvelles instances, tandis que la réduction du nombre d'instances entraîne l'arrêt des instances en cours d'exécution.

  • La mise à l'échelle de processus (thread) fait référence au maintien d'une capacité suffisante en termes de traitement des threads dans une instance de rôle donnée en ajustant le nombre de threads en fonction de la charge de travail actuelle.

La mise à l'échelle dynamique dans une solution de messagerie basée sur les files d'attente devrait suivre un certain nombre de recommandations, notamment :

  1. La surveillance des indicateurs de performance clés, notamment l'utilisation du processeur, la longueur de la file d'attente, les temps de réponse et la latence de traitement des messages.

  2. L'augmentation ou la réduction dynamique du nombre d'instances de rôle pour faire face aux pics de charge de travail, qu'ils soient prédictibles ou non.

  3. L'extension et la réduction par programme du nombre de threads de traitement pour répondre aux charges variables gérées par une instance de rôle spécifique.

  4. Le partitionnement et le traitement de granularité fine des charges de travail simultanées grâce à Task Parallel Library dans le .NET Framework 4.

  5. Le maintien d'une capacité viable dans les solutions caractérisées par une charge de travail très volatile, en prévision des pics soudains, afin de pouvoir les gérer sans configurer des instances supplémentaires.

L'API de gestion du service permet à un service hébergé dans Windows Azure de modifier le nombre d'instances de rôle en cours d'exécution en modifiant la configuration du déploiement au moment de l'exécution.

noteRemarque
Le nombre maximal de petites instances de calcul Windows Azure (ou le nombre équivalent d'instances de calcul de taille différentes en termes de nombre de cœurs) dans un abonnement standard est limité à 20 par défaut. Toutes les demandes d'augmentation de quota doivent être adressées à l'équipe de Support de Windows Azure. Pour plus d'informations, consultez Questions fréquentes sur la plateforme Windows Azure.

La mise à l'échelle du nombre d'instances de rôle n'est pas toujours la meilleure solution pour gérer les pics de charge. Par exemple, une nouvelle instance de rôle peut mettre plusieurs secondes à tourner à plein régime, et il n'existe actuellement aucune mesure SLA fournie à ce sujet. En revanche, il suffit parfois d'augmenter simplement le nombre de threads de travail pour gérer l'augmentation d'une charge de travail volatile. Lorsque la charge de travail est traitée, la solution surveille les mesures de charge appropriées et détermine s'il faut réduire ou augmenter dynamiquement le nombre de processus de travail.

ImportantImportant
Actuellement, la cible d'évolutivité pour une file d'attente Windows Azure unique est « limitée » à 500 transactions/s. Si une application tente de dépasser cette cible, par exemple, en effectuant des opérations de file d'attente à partir de plusieurs instances de rôle exécutant des centaines de threads de retrait de la file d'attente, une réponde HTTP 503 « Serveur occupé » peut être reçue du service de stockage. Dans ce cas, l'application doit implémenter un mécanisme de nouvelle tentative avec un algorithme de délai de temporisation exponentiel. Toutefois, si les erreurs HTTP 503 se produisent régulièrement, il est recommandé d'utiliser plusieurs files d'attente et d'implémenter une stratégie basée sur un partitionnement pour mettre à l'échelle sur plusieurs files d'attente.

Dans la plupart des cas, la mise à l'échelle automatique des processus de travail se fait au niveau de l'instance de rôle spécifique. En revanche, la mise à l'échelle d'une instance de rôle implique généralement un élément central de l'architecture de solution responsable de surveiller les mesures de performances et de déterminer les mises à l'échelle appropriées. Le schéma ci-dessous représente un composant de service appelé agent dynamique de mise à l'échelle qui rassemble et analyse les mesures de charge pour déterminer s'il doit mettre en service des instances ou libérer des instances inactives.

Meilleures pratiques - Solutions de messagerie - Azure4

Il est à noter que le service de l'agent de mise à l'échelle peut être déployé en tant que rôle de travail en cours de exécution sur Windows Azure, ou en tant que service sur site. Indépendamment de la topologie de déploiement, le service peut accéder aux files d'attente Windows Azure.

Pour implémenter une fonction de mise à l'échelle dynamique, envisagez d'utiliser le Bloc de mise à l'échelle automatique des applications Microsoft Enterprise Library qui active le comportement automatique de mise à l'échelle dans les solutions s'exécutant sur Windows Azure. Le bloc de mise à l'échelle automatique d'applications fournit toutes les fonctionnalités nécessaires pour configurer et surveiller la mise à l'échelle automatique dans une application Windows Azure.

Maintenant que nous avons couvert l'impact de la latence, les coûts de stockage et les spécifications de la mise à l'échelle dynamique, il est temps de consolider nos recommandations dans une implémentation technique.

Implémentation technique

Dans les sections précédentes, nous avons examiné les principales caractéristiques d'une architecture de messagerie bien conçue, basée sur les files d'attente du stockage Windows Azure. Nous avons abordé trois techniques clés qui permettent de réduire la latence, d'optimiser les coûts de stockage et d'améliorer la réactivité des charges de travail variables.

Cette section vise à fournir un point de départ aux développeurs de Windows Azure pour implémenter une partie des modèles référencés dans ce livre blanc du point de vue de la programmation.

noteRemarque
Cette section traite de la création d'un écouteur de file d'attente mis automatiquement à l'échelle, prenant en charge les modèles de type push et de type pull. Pour les techniques avancées de mise à l'échelle dynamique au niveau de l'instance de rôle, consultez Bloc de mise à l'échelle automatique des applications Enterprise Library.

En outre, pour rester concis, nous nous concentrerons uniquement sur les principaux éléments fonctionnels et nous éviterons d'ajouter de la complexité en omettant une grande partie du code de prise en charge de l'infrastructure dans les exemples de code ci-dessous. À des fins de clarification, il est également intéressant de préciser que l'implémentation technique décrite ci-dessous n'est pas la seule solution à tous les problèmes d'espace. Elle fournit un point de départ duquel les développeurs peuvent dériver des solutions spécifiques plus fluides.

À partir de ce paragraphe, ce livre blanc se concentrera sur le code source requis pour implémenter les modèles décrits ci-dessus.

Créer un écouteur de file d'attente générique

Nous allons d'abord définir un contrat implémenté par un composant d'écoute de file d'attente hébergé par un rôle de travail et écoutant une file d'attente Windows Azure.

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension
{
    /// Starts a multi-threaded queue listener that uses the specified number of dequeue threads.
    void StartListener(int threadCount);

    /// Returns the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo QueryState();

    /// Gets or sets the batch size when performing dequeue operation against a Windows Azure queue.
    int DequeueBatchSize { get; set; }

    /// Gets or sets the default interval that defines how long a queue listener will be idle for between polling a queue.
    TimeSpan DequeueInterval { get; set; }

    /// Defines a callback delegate which will be invoked whenever the queue is empty.
    event WorkCompletedDelegate QueueEmpty;
}

L'événement QueueEmpty est destiné à être utilisé par un hôte. Il fournit un mécanisme pour que l'hôte détermine le comportement de l'écouteur de file d'attente lorsque la file d'attente est vide. Le délégué de l'événement respectif est défini comme suit :

/// <summary>
/// Defines a callback delegate which will be invoked whenever an unit of work has been completed and the worker is
/// requesting further instructions as to next steps.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="idleCount">The value indicating how many times the worker has been idle.</param>
/// <param name="delay">Time interval during which the worker is instructed to sleep before performing next unit of work.</param>
/// <returns>A flag indicating that the worker should stop processing any further units of work and must terminate.</returns>
public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);

Il est plus facile de gérer les éléments de la file d'attente si un écouteur peut fonctionner avec des génériques, sans utiliser les classes du kit de développement logiciel (SDK) « complètes » telles que CloudQueueMessage. Par conséquent, nous allons définir une nouvelle interface implémentée par un écouteur de file d'attente en mesure de prendre en charge un accès aux files d'attente basé sur les génériques :

/// <summary>
/// Defines a contract that must be supported by an extension that implements a generics-aware queue listener.
/// </summary>
/// <typeparam name="T">The type of queue item data that will be handled by the queue listener.</typeparam>
public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T>
{
}

Notez que nous avons également activé l'écouteur de génériques pour placer des éléments de la file d'attente sur un ou plusieurs abonnés via le modèle de conception Observateur au moyen de l'interface IObservable<T> disponible dans le .NET Framework 4.

Nous souhaitons conserver une seule instance d'un composant qui implémente l'interface ICloudQueueListenerExtension<T>. Toutefois, nous devons être en mesure d'exécuter plusieurs threads de retrait de la file d'attente (processus de travail, ou tâches, pour simplifier). Par conséquent, nous allons ajouter la prise en charge de la logique de retrait de la file d'attente multithread dans le composant d'écoute de file d'attente. C'est là que nous allons utiliser la fonctionnalité Task Parallel Library (TPL). La méthode StartListener sera chargée de lancer le nombre spécifié de threads de retrait de la file d'attente comme suit :


/// <summary>
/// Starts the specified number of dequeue tasks.
/// </summary>
/// <param name="threadCount">The number of dequeue tasks.</param>
public void StartListener(int threadCount)
{
    Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");

    // The collection of dequeue tasks needs to be reset on each call to this method.
    if (this.dequeueTasks.IsAddingCompleted)
    {
        this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList);
    }

    for (int i = 0; i < threadCount; i++)
    {
        CancellationToken cancellationToken = this.cancellationSignal.Token;
        CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage);

        // Start a new dequeue task and register it in the collection of tasks internally managed by this component.
        this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
    }

    // Mark this collection as not accepting any more additions.
    this.dequeueTasks.CompleteAdding();
}

La méthode DequeueTaskMain implémente le corps fonctionnel d'un thread de retrait de la file d'attente. Les opérations principales sont les suivantes :

/// <summary>
/// Implements a task performing dequeue operations against a given Windows Azure queue.
/// </summary>
/// <param name="state">An object containing data to be used by the task.</param>
private void DequeueTaskMain(object state)
{
    CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

    int idleStateCount = 0;
    TimeSpan sleepInterval = DequeueInterval;

    try
    {
        // Run a dequeue task until asked to terminate or until a break condition is encountered.
        while (workerState.CanRun)
        {
            try
            {
                var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                int messageCount = 0;

                // Process the dequeued messages concurrently by taking advantage of the above PLINQ query.
                queueMessages.ForAll((message) =>
                {
                    // Reset the count of idle iterations.
                    idleStateCount = 0;

                    // Notify all subscribers that a new message requires processing.
                    workerState.OnNext(message);

                    // Once successful, remove the processed message from the queue.
                    workerState.QueueStorage.Delete<T>(message);

                    // Increment the number of processed messages.
                    messageCount++;
                });

                // Check whether or not we have done any work during this iteration.
                if (0 == messageCount)
                {
                    // Increment the number of iterations when we were not doing any work (e.g. no messages were dequeued).
                    idleStateCount++;

                    // Call the user-defined delegate informing that no more work is available.
                    if (QueueEmpty != null)
                    {
                        // Check if the user-defined delegate has requested a halt to any further work processing.
                        if (QueueEmpty(this, idleStateCount, out sleepInterval))
                        {
                            // Terminate the dequeue loop if user-defined delegate advised us to do so.
                            break;
                        }
                    }

                    // Enter the idle state for the defined interval.
                    Thread.Sleep(sleepInterval);
                }
            }
            catch (Exception ex)
            {
                if (ex is OperationCanceledException)
                {
                    throw;
                }
                else
                {
                    // Offload the responsibility for handling or reporting the error to the external object.
                    workerState.OnError(ex);

                    // Sleep for the specified interval to avoid a flood of errors.
                    Thread.Sleep(sleepInterval);
                }
            }
        }
    }
    finally
    {
        workerState.OnCompleted();
    }
}

Un certain nombre de remarques sont nécessaires concernant l'implémentation de la méthode DequeueTaskMain.

Premièrement, nous utilisons le langage LINQ parallèle (PLINQ) lors de la distribution des messages à traiter. Le principal avantage de PLINQ est qu'il permet d'accélérer la gestion des messages en exécutant le délégué de requête sur des threads de travail distincts et sur plusieurs processeurs en parallèle, lorsque cela est possible.

noteRemarque
Étant donné que la parallélisation de la requête est gérée en interne par PLINQ, il n'y a aucune garantie que PLINQ utilisera plus d'un noyau pour la parallélisation du travail. PLINQ peut exécuter une requête séquentielle s'il détermine que la surcharge de la parallélisation ralentira la requête. Pour tirer parti de PLINQ, le travail total de la requête doit être suffisamment important pour tirer parti de la surcharge de planification du travail dans le pool de threads.

Deuxièmement, nous n'allons pas extraire un seul message à la fois. En revanche, nous allons demander à l'API du service File d'attente d'extraire un nombre donné de messages d'une file d'attente. Cela est piloté par le paramètre DequeueBatchSize passé à la méthode Get<T>. Lorsque nous écrivons la couche d'abstraction de stockage implémentée dans la solution globale, ce paramètre est fourni à la méthode d'API du service File d'attente. En outre, nous allons exécuter un contrôle de sécurité pour garantir que la taille du lot ne dépasse pas la taille maximale prise en charge par l'API. Cela est implémenté comme suit :

/// This class provides reliable generics-aware access to the Windows Azure Queue storage.
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    /// The maximum batch size supported by Queue Service API in a single Get operation.
    private const int MaxDequeueMessageCount = 32;

    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
        Guard.ArgumentNotZeroOrNegativeValue(count, "count");

        try
        {
            var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

            IEnumerable<CloudQueueMessage> queueMessages = this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() =>
            {
                return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout);
            });

            // ... There is more code after this point ...

Enfin, nous n'allons pas exécuter la tâche de retrait de la file d'attente indéfiniment. Nous avons fourni un point de contrôle explicite implémenté en tant qu'événement QueueEmpty, déclenché lorsqu'une file d'attente est vide. À ce stade, nous allons consulter un gestionnaire d'événements QueueEmpty pour déterminer s'il nous permet ou non de terminer l'exécution de la tâche de retrait de la file d'attente. Une implémentation bien conçue du gestionnaire d'événements QueueEmpty permet la prise en charge de la capacité de « mise à l'échelle automatique », comme expliqué dans la section suivante.

Réduction automatique des tâches de retrait de la file d'attente

L'objectif du gestionnaire d'événements QueueEmpty est double. En premier lieu, il est chargé de fournir une rétroaction à la tâche de retrait de la file d'attente source, lui indiquant de passer à l'état de veille pour un intervalle donné (défini par le paramètre de sortie delay dans le délégué d'événement). Deuxièmement, il indique à la tâche de retrait de la file d'attente si elle doit ou non s'arrêter normalement (comme indiqué par le paramètre de retour booléen).

L'implémentation du gestionnaire d'événements QueueEmpty qui suit résout les problèmes soulevés plus haut dans ce livre blanc. Elle calcule un intervalle de temporisation exponentiel aléatoire et indique à la tâche de retrait de la file d'attente d'augmenter de manière exponentielle le délai entre les demandes d'interrogation de la file d'attente. Notez que le délai de temporisation n'excédera pas 1 seconde, comme configuré dans notre solution, car il n'est pas réellement nécessaire de disposer d'une longue temporisation entre les interrogations lorsque la mise à l'échelle est suffisamment bien implémentée. En outre, elle interroge l'état de l'écouteur de la file d'attente pour déterminer le nombre de tâches de retrait de la file d'attente actives. Si ce nombre est dépassé d'une seule unité, le gestionnaire d'événements informe la tâche de retrait de la file d'attente d'origine d'arrêter la boucle d'interrogation, sous réserve que l'intervalle de temporisation ait expiré. Sinon, la tâche de retrait de la file d'attente continue, en laissant exactement 1 thread d'interrogation s'exécuter à la fois par instance unique de l'écouteur de file d'attente. Cette approche vous permet de réduire le nombre de transactions de stockage et réduit par conséquent les coûts de transaction comme expliqué précédemment.

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
    ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

    // Find out which extension is responsible for retrieving the worker role configuration settings.
    IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>();

    // Get the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo queueServiceState = queueService.QueryState();

    // Set up the initial parameters, read configuration settings.
    int deltaBackoffMs = 100;
    int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds);
    int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds);

    // Calculate a new sleep interval value that will follow a random exponential back-off curve.
    int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
    int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);

    // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
    delay = TimeSpan.FromMilliseconds((double)interval);

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself
    // unless this is a last deqeueue task. If so, we are not going to keep it running and continue polling the queue.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1;
}

Plus précisément, la capacité de « réduire les tâches de retrait de la file d'attente » décrite ci-dessus peut être expliquée comme suit :

  1. Lorsque la file d'attente contient des éléments, les tâches de retrait de la file d'attente s'assurent que la charge de travail sera traitée aussi rapidement que possible. Il n'y aura aucun délai entre les demandes de retrait des messages d'une file d'attente.

  2. Dès que la file d'attente source est vide, chaque tâche de retrait de la file d'attente génère un événement QueueEmpty.

  3. Le gestionnaire d'événements QueueEmpty calcule un délai de temporisation exponentiel aléatoire et indique à la tâche de suspendre son activité pour un intervalle donné.

  4. Les tâches de retrait de la file d'attente continueront d'interroger la file d'attente source à des intervalles calculés jusqu'à ce que le délai d'inactive expire.

  5. Une fois que le délai d'inactivité a expiré, et à condition que la file d'attente source soit toujours vide, toutes les tâches de retrait de la file d'attente actives s'arrêtent. Cela ne se fera pas en une seule fois, car les tâches sont temporisées différemment dans l'algorithme.

  6. À un moment donné, il ne restera plus qu'une seule tâche de retrait de la file d'attente active en attente de travail. Par conséquent, aucune transaction d'interrogation inactive n'aura lieu sur une file d'attente, sauf une, correspondant à cette seule tâche.

Pour développer le processus de collecte des caractéristiques de la charge dans le temps, il est intéressant de mentionner les artefacts de code source appropriés. En premier lieu, il existe une structure qui contient des mesures pertinentes pour évaluer la charge appliquée à la solution. Pour simplifier, nous avons regroupé quelques mesures qui seront utilisées ultérieurement dans l'exemple de code.

/// Implements a structure containing point-in-time load characteristics for a given queue listener.
public struct CloudQueueListenerInfo
{
    /// Returns the approximate number of items in the Windows Azure queue.
    public int CurrentQueueDepth { get; internal set; }

    /// Returns the number of dequeue tasks that are actively performing work or waiting for work.
    public int ActiveDequeueTasks { get; internal set; }

    /// Returns the maximum number of dequeue tasks that were active at a time.
    public int TotalDequeueTasks { get; internal set; }
}

En second lieu, il existe une méthode implémentée par un écouteur de file d'attente qui retourne les mesures de la charge comme représenté dans l'exemple suivant :

/// Returns the current state of the queue listener to determine point-in-time load characteristics.
public CloudQueueListenerInfo QueryState()
{
    return new CloudQueueListenerInfo()
    {
        CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName),
        ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(),
        TotalDequeueTasks = this.dequeueTasks.Count
    };
}

Augmentation automatique des tâches de retrait de la file d'attente

Dans la section précédente, nous avons expliqué comment réduire le nombre de tâches de retrait de la file d'attente actives à une seule instance afin de minimiser l'impact des transactions inactives sur les coûts liés aux opérations de stockage. Dans cette section, nous allons vous guider à travers l'exemple inverse, en implémentant la capacité de « mise à l'échelle automatique » pour retrouver la puissance de traitement en cas de besoin.

Nous allons d'abord définir un délégué d'événement qui va suivre les transitions d'état vide ou non vide d'une file d'attente afin de déclencher les actions appropriées :

/// <summary>
/// Defines a callback delegate which will be invoked whenever new work arrived to a queue while the queue listener was idle.
/// </summary>
/// <param name="sender">The source of the event.</param>
public delegate void WorkDetectedDelegate(object sender);

Nous étendons ensuite la définition d'origine de l'interface ICloudQueueServiceWorkerRoleExtension pour inclure un nouvel événement qui sera déclenché chaque fois qu'un écouteur de file d'attente détectera de nouveaux éléments de travail, essentiellement lorsque la longueur de la file d'attente passera de zéro à une valeur positive :

public interface ICloudQueueServiceWorkerRoleExtension
{
    // ... The other interface members were omitted for brevity. See the previous code snippets for reference ...

    // Defines a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    event WorkDetectedDelegate QueueWorkDetected;
}

En outre, nous allons déterminer l'emplacement exact du code de l'écouteur de file d'attente où un tel événement sera déclenché. Nous allons déclencher l'événement QueueWorkDetected depuis la boucle de retrait de la file d'attente implémentée dans la méthode DequeueTaskMain, qui doit être étendu comme suit :

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // An instance of the delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    public event WorkDetectedDelegate QueueWorkDetected;

    private void DequeueTaskMain(object state)
    {
        CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

        int idleStateCount = 0;
        TimeSpan sleepInterval = DequeueInterval;

        try
        {
            // Run a dequeue task until asked to terminate or until a break condition is encountered.
            while (workerState.CanRun)
            {
                try
                {
                    var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                    int messageCount = 0;

                    // Check whether or not work items arrived to a queue while the listener was idle.
                    if (idleStateCount > 0 && queueMessages.Count() > 0)
                    {
                        if (QueueWorkDetected != null)
                        {
                            QueueWorkDetected(this);
                        }
                    }

                    // ... The rest of the code was omitted for brevity. See the previous code snippets for reference ...

Lors de la dernière étape, nous allons fournir un gestionnaire d'événements QueueWorkDetected. L'implémentation de ce gestionnaire d'événements est fournie par un composant qui instancie et héberge l'écouteur de file d'attente. Dans notre cas, il s'agit d'un rôle de travail. Le code chargé de l'instanciation et de l'implémentation du gestionnaire d'événements comprend les éléments suivants :

public class WorkItemProcessorWorkerRole : RoleEntryPoint
{
    // Called by Windows Azure to initialize the role instance.
    public override sealed bool OnStart()
    {
        // ... There is some code before this point ...

        // Instantiate a queue listener for the input queue.
        var inputQueueListener = new CloudQueueListenerExtension<XDocument>(inputQueueLocation);

        // Configure the input queue listener.
        inputQueueListener.QueueEmpty += HandleQueueEmptyEvent;
        inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent;
        inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize;
        inputQueueListener.DequeueInterval = configSettingsExtension.Settings.MinimumIdleInterval;

        // ... There is more code after this point ...
    }

    // Implements a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    private void HandleQueueWorkDetectedEvent(object sender)
    {
        // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
        ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

        // Get the current state of the queue listener to determine point-in-time load characteristics.
        CloudQueueListenerInfo queueServiceState = queueService.QueryState();

        // Determine the number of queue tasks that would be required to handle the workload in a queue given its current depth.
        int dequeueTaskCount = GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth);

        // If the dequeue task count is less than computed above, start as many dequeue tasks as needed.
        if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount)
        {
            // Start the required number of dequeue tasks.
            queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks);
        }
    }       // ... There is more code after this point ...

À la lumière de l'exemple ci-dessus, la méthode GetOptimalDequeueTaskCount mérite d'être examinée de plus près. Cette méthode est chargée de calculer le nombre idéal de tâches de retrait de la file d'attente pour gérer la charge de travail dans une file d'attente. Lorsqu'elle est appelée, elle doit déterminer (via des mécanismes de prise de décision appropriés) la « puissance » dont l'écouteur de file d'attente a besoin pour traiter le volume de travail en attente ou attendu dans une file d'attente spécifique.

Par exemple, le développeur peut adopter une approche très simplifiée et inclure un jeu de règles statiques directement dans la méthode GetOptimalDequeueTaskCount. En utilisant les caractéristiques de débit et d'évolutivité connues de l'infrastructure de mise en file d'attente, la latence moyenne de traitement, la taille de la charge et d'autres facteurs pertinents, le jeu de règles peut adopter une approche optimiste et décider d'un nombre optimal de tâches de retrait de la file d'attente.

Dans l'exemple ci-dessous, une technique intentionnellement très simple est utilisée pour déterminer le nombre de tâches de retrait de la file d'attente :

/// <summary>
/// Returns the number of queue tasks that would be required to handle the workload in a queue given its current depth.
/// </summary>
/// <param name="currentDepth">The approximate number of items in the queue.</param>
/// <returns>The optimal number of dequeue tasks.</returns>
private int GetOptimalDequeueTaskCount(int currentDepth)
{
    if (currentDepth < 100) return 10;
    if (currentDepth >= 100 && currentDepth < 1000) return 50;
    if (currentDepth >= 1000) return 100;

    // Return the minimum acceptable count.
    return 1;
}

Il est important de répéter que l'exemple de code ci-dessus n'est pas une approche universelle, convenant à toutes les situations. Une meilleure solution serait d'appeler une règle externe configurable et gérable qui effectuerait les calculs nécessaires.

À ce stade, nous avons un prototype d'écouteur de file d'attente opérationnel, en mesure de se mettre à l'échelle automatiquement en augmentant ou en réduisant les tâches en fonction d'une charge de travail variable. Éventuellement, pour y apporter une touche finale, nous pourrions l'enrichir en le rendant capable de s'adapter automatiquement à la charge la variable pendant qu'il s'exécute. Cette capacité peut être ajoutée en appliquant le même modèle suivi lorsque nous avons ajouté la prise en charge de l'événement de QueueWorkDetected.

Passons maintenant à une autre optimisation importante qui nous aidera à réduire la latence dans les écouteurs de file d'attente.

Implémentation d'une couche de publication/abonnement pour un retrait de la file d'attente sans aucune latence

Dans cette section, nous allons améliorer l'implémentation de l'écouteur de file d'attente ci-dessus avec un mécanisme de notification de type push construit sur la capacité de multidiffusion unidirectionnelle Service Bus. Le mécanisme de notification est chargé de déclencher un événement indiquant à l'écouteur de file d'attente de commencer le travail de retrait de la file d'attente. Cette approche permet d'éviter d'interroger la file d'attente afin de vérifier la présence de nouveaux messages et, donc, d'éliminer la latence associée.

Nous allons d'abord définir un événement déclencheur qui sera reçu par notre écouteur de file d'attente lorsqu'une nouvelle charge de travail est déposée dans une file d'attente :

/// Implements a trigger event indicating that a new workload was put in a queue.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class CloudQueueWorkDetectedTriggerEvent
{
    /// Returns the name of the storage account on which the queue is located.
    [DataMember]
    public string StorageAccount { get; private set; }

    /// Returns a name of the queue where the payload was put.
    [DataMember]
    public string QueueName { get; private set; }

    /// Returns a size of the queue's payload (e.g. the size of a message or the number of messages in a batch).
    [DataMember]
    public long PayloadSize { get; private set; }

    // ... The constructor was omitted for brevity ...
}

Ensuite, nous allons permettre aux instances de l'écouteur de file d'attente d'agir en tant qu'abonnés pour recevoir un événement déclencheur. La première étape consiste à définir un écouteur de file d'attente comme observateur de l'événement CloudQueueWorkDetectedTriggerEvent :

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent>
{
    // ... The body is omitted as it was supplied in previous examples ...
}

La seconde étape consiste à appliquer la méthode OnNext définie dans l'interface IObserver<T>. Cette méthode est appelée par le fournisseur pour notifier à l'observateur un nouvel événement :

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // ... There is some code before this point ...

    /// <summary>
    /// Gets called by the provider to notify this queue listener about a new trigger event.
    /// </summary>
    /// <param name="e">The trigger event indicating that a new payload was put in a queue.</param>
    public void OnNext(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Make sure the trigger event is for the queue managed by this listener, otherwise ignore.
        if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName)
        {
            if (QueueWorkDetected != null)
            {
                 QueueWorkDetected(this);
            }
        }
    }

    // ... There is more code after this point ...
}

Comme nous pouvons le voir dans l'exemple ci-dessus, nous appelons à bon escient le même délégué d'événement utilisé dans les étapes précédentes. Le gestionnaire d'événements QueueWorkDetected fournit déjà la logique d'application nécessaire pour instancier le nombre optimal de tâches de retrait de la file d'attente. Par conséquent, le même gestionnaire d'événements est réutilisé pour la gestion de la notification CloudQueueWorkDetectedTriggerEvent.

Comme noté dans les sections précédentes, il n'est pas nécessaire d'exécuter en continu une tâche de retrait de la file d'attente lorsqu'une notification de type push est utilisée. Par conséquent, nous pouvons réduire à zéro le nombre de tâches de file d'attente par instance d'écouteur et utiliser un mécanisme de notification pour instancier les tâches de retrait de la file d'attente lorsque celle-ci reçoit des éléments de travail. Afin de nous assurer que nous n'exécutons aucune tâche de retrait de la file d'attente inactive, la modification simple qui suit est requise dans le gestionnaire d'événements QueueEmpty :

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // ... There is some code before this point ...

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs;
}

En résumé, nous ne détectons plus s'il reste ou non une seule tâche de retrait de la file d'attente active. Le gestionnaire d'événements QueueEmpty modifié prend en considération uniquement le dépassement de l'intervalle d'inactivité maximal, à l'expiration duquel toutes les tâches de retrait de la file d'attente actives s'arrêteront.

Pour recevoir les notifications CloudQueueWorkDetectedTriggerEvent, nous avons utilisé le modèle de publication/abonnement implémenté en tant que messagerie faiblement couplée entre les instances de rôle de Windows Azure. Fondamentalement, nous utilisons la même couche de communication entre les rôles et nous gérons les événements entrants comme suit :

public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension
{
    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    public void OnNext(InterRoleCommunicationEvent e)
    {
        if (this.owner != null && e.Payload != null)
        {
            // ... There is some code before this point ...

            if (e.Payload is CloudQueueWorkDetectedTriggerEvent)
            {
                HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent);
                return;
            }

            // ... There is more code after this point ...
        }
    }

    private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Enumerate through registered queue listeners and relay the trigger event to them.
        foreach (var queueService in this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>())
        {
            // Pass the trigger event to a given queue listener.
            queueService.OnNext(e);
        }
    }
}

La multidiffusion d'un événement déclencheur défini dans la classe CloudQueueWorkDetectedTriggerEvent incombe définitivement à un éditeur, à savoir, le composant qui dépose les éléments de travail dans une file d'attente. Cet événement peut être déclenché avant que le tout premier élément de travail est mis en file d'attente, ou après que le dernier élément est mis en file d'attente. Dans l'exemple ci-dessous, nous publions un événement déclencheur après avoir mis les éléments de travail dans la file d'attente d'entrée :

public class ProcessInitiatorWorkerRole : RoleEntryPoint
{
    // The instance of the role extension which provides an interface to the inter-role communication service.
    private volatile IInterRoleCommunicationExtension interRoleCommunicator;

    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    private void HandleWorkload()
    {
        // Step 1: Receive compute-intensive workload.
        // ... (code was omitted for brevity) ...

        // Step 2: Enqueue work items into the input queue.
        // ... (code was omitted for brevity) ...

        // Step 3: Notify the respective queue listeners that they should expect work to arrive.
        // Create a trigger event referencing the queue into which we have just put work items.
        var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue");

        // Package the trigger into an inter-role communication event.
        var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger);

        // Publish inter-role communication event via the Service Bus one-way multicast.
        interRoleCommunicator.Publish(interRoleEvent);
    }
}

Maintenant que nous avons créé un écouteur de file d'attente qui peut prendre en charge le multithreading, la mise à l'échelle automatique et les notifications de type push, il est temps de consolider l'ensemble des recommandations de création de solutions de messagerie basée sur les files d'attente sur la plateforme Windows Azure.

Conclusion

Pour optimiser la rentabilité et l'efficacité des solutions de messagerie basée sur les files d'attente qui s'exécutent sur la plateforme Windows Azure, les architectes et les développeurs doivent prendre en compte les recommandations suivantes.

En tant qu'architecte de solution, vous devez :

  • Implémenter une architecture de messagerie basée sur les files d'attente qui utilise le service de stockage de files d'attente Windows Azure pour la communication asynchrone à grande échelle entre les couches et les services dans les solutions basées sur le cloud ou hybrides.

  • Recommander une architecture de mise en file d'attente partitionnée capable de plus de 500 transactions/s.

  • Comprendre les caractéristiques principales du modèle de tarification de Windows Azure et optimiser la solution afin de réduire les coûts de transaction au moyen des meilleures pratiques et de modèles de conception appropriés.

  • Tirer parti de la mise à l'échelle dynamique en créant une architecture qui s'adapte aux charges de travail variables et fluctuantes.

  • Utiliser les bonnes techniques et méthodes de mise à l'échelle pour étendre ou réduire de façon élastique la puissance de calcul, et optimiser davantage les charges d'exploitation.

  • Évaluer le ratio coût/bénéfice en ce qui concerne la réduction de la latence en exploitant le Service Bus de Windows Azure pour la distribution de notifications de type push en temps réel.

En tant que développeur, vous devez :

  • Concevoir une solution de messagerie qui utilise la mise en lots lors du stockage et du retrait des données à partir des files d'attente Windows Azure.

  • Implémenter un service d'écouteur de file d'attente efficace, en mesure de garantir que les files d'attente sont interrogées par au maximum un thread de retrait de la file d'attente lorsque la file d'attente est vide.

  • Réduire dynamiquement le nombre d'instances de rôle de travail lorsque les files d'attente sont vides pendant une longue période.

  • Implémenter un algorithme de temporisation exponentiel aléatoire spécifique à l'application pour réduire l'impact de l'interrogation des files d'attente inactives sur les coûts du stockage.

  • Adopter les bonnes techniques afin d'empêcher tout dépassement des cibles d'évolutivité pour chaque file d'attente en cas d'utilisation d'éditeurs et de consommateurs de file d'attente comportant plusieurs instances et un grand nombre de processus.

  • Utiliser une stratégie de retrait fiable capable de gérer un grand nombre de conditions temporaires lors de la publication et de la consommation des données dans les files d'attente Windows Azure.

  • Utiliser la fonction de création d'événements unidirectionnel fournie par le Service Bus de Windows Azure pour prendre en charge les notifications de type push afin de réduire la latence et d'améliorer les performances de la solution de messagerie basée sur les files d'attente.

  • Explorer les nouvelles fonctions du .NET Framework 4 telles que TPL, PLINQ et le modèle Observateur pour optimiser le degré de parallélisme, améliorer la concurrence et simplifier la conception des services multithread.

Les exemples de code contenus dans ce document sont disponibles au téléchargement dans MSDN Code Gallery. Ces exemples de code contiennent également tous les composants d'infrastructure requis, tels que la couche d'abstraction acceptant les génériques du service de files d'attente Windows Azure, qui n'ont pas été fournis dans les extraits de code de cet article. Notez que tous les fichiers de code source sont régis par la licence publique de Microsoft, comme expliqué dans les mentions légales correspondantes.

Références et ressources supplémentaires

Pour plus d'informations sur le sujet abordé dans ce livre blanc, consultez la documentation suivante :


Date de génération :

2013-10-23
Cela vous a-t-il été utile ?
(1500 caractères restants)
Merci pour vos suggestions.

Ajouts de la communauté

AJOUTER
Afficher:
© 2014 Microsoft. Tous droits réservés.