Procédure pas à pas : création des agents de flux de données

Ce document montre comment créer des applications basées sur agent selon le flux de données, plutôt que selon le flux de contrôle.

Le flux de contrôle fait référence à l'ordre d'exécution des opérations dans un programme. Le flux de contrôle est régulé à l'aide de structures de contrôle, telles que des instructions conditionnelles, des boucles, etc. Par ailleurs, le flux de données fait référence à un modèle de programmation où les calculs sont effectués uniquement lorsque toutes les données requises sont disponibles. Le modèle de programmation de flux de données est lié au concept de passage de message, dans lequel les composants indépendants d'un programme communiquent les uns avec les autres en envoyant des messages.

Les agents asynchrones prennent en charge les modèles de programmation de flux de contrôle et de flux de données. Bien que le modèle de flux de contrôle soit approprié dans de nombreux cas, le modèle de flux de données est approprié dans d'autres cas, par exemple lorsqu'un agent reçoit des données et effectue une action basée sur la charge de ces données.

Composants requis

Lisez les documents suivants avant de démarrer cette procédure pas-à-pas :

Sections

Cette procédure pas-à-pas contient les sections suivantes :

  • Création d'un agent de flux de contrôle de base

  • Création d'un agent de flux de données de base

  • Création d'un agent d'enregistrement des messages

Création d'un agent de flux de contrôle de base

Prenons l'exemple suivant, qui définit la classe control_flow_agent. La classe control_flow_agent agit sur trois mémoires tampons de messages : une mémoire tampon d'entrée et deux mémoires tampons de sortie. La méthode run lit à partir de la mémoire tampon de messages source dans une boucle et utilise une instruction conditionnelle pour diriger le flux d'exécution du programme. L'agent incrémente un compteur pour les valeurs négatives différentes de zéro et incrémente un autre compteur pour les valeurs positives différentes de zéro. Lorsque l'agent reçoit la valeur de sentinelle zéro, il envoie les valeurs des compteurs aux mémoires tampons de messages de sortie. Les méthodes negatives et positives permettent à l'application de lire le nombre de valeurs positives et négatives à partir de l'agent.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Bien que cet exemple utilise le flux de contrôle dans un agent de façon élémentaire, il montre que la programmation basée sur le contrôle de flux est de nature consécutive. Chaque message doit être traité de façon séquentielle, même si plusieurs messages peuvent être disponibles dans la mémoire tampon des messages d'entrée. Le modèle de flux de données permet aux deux branches de l'instruction conditionnelle de procéder à une évaluation simultanément. Le modèle de flux de données vous permet également de créer des réseaux de messagerie plus complexes, qui agissent sur les données à mesure qu'elles deviennent disponibles.

[retour en haut]

Création d'un agent de flux de données de base

Cette section indique comment convertir la classe control_flow_agent afin d'utiliser le modèle de flux de données pour effectuer la même tâche.

L'agent de flux de données repose sur la création d'un réseau de mémoires tampons de messages, chaque réseau répondant à des besoins précis. Certains blocs de messages utilisent une fonction de filtre pour accepter ou rejeter un message en fonction de sa charge. Une fonction de filtre permet de vérifier qu'un bloc de message ne reçoit que certaines valeurs.

Pour convertir l'agent de flux de contrôle en agent de flux de données

  1. Copiez le corps de la classe control_flow_agent vers une autre classe, par exemple, dataflow_agent. Vous pouvez également renommer la classe control_flow_agent.

  2. Supprimez le corps de la boucle qui appelle receive à partir de la méthode run.

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. Dans la méthode run, après l'initialisation des variables negative_count et positive_count, ajoutez un objet countdown_event qui assure le suivi du nombre d'opérations actives.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    La classe countdown_event est illustrée plus loin dans cette rubrique.

  4. Créez les objets de la mémoire tampon des messages qui participeront au réseau du flux de données.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Connectez les mémoires tampons des messages pour former un réseau.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Attendez que les objets event et countdown event soient définis. Ces événements signalent que l'agent a reçu la valeur de sentinelle et que toutes les opérations sont terminées.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

Le schéma suivant illustre le réseau de flux de données complet pour la classe dataflow_agent :

Le réseau de flux de données

Le tableau suivant décrit les membres du réseau.

Membre

Description

increment_active

Objet Concurrency::transformer qui incrémente le compteur d'événements actifs et passe la valeur d'entrée au reste du réseau.

negatives, positives

Objets Concurrency::call qui incrémentent le nombre de nombres et décrémente le compteur d'événements actifs. Chaque objet utilise un filtre pour accepter des nombres négatifs ou des nombres positifs.

sentinel

Objet Concurrency::call qui accepte uniquement la valeur sentinelle zéro et décrémente le compteur d'événements actif.

connector

Objet Concurrency::unbounded_buffer qui connecte la mémoire tampon des messages sources au réseau interne.

Étant donné que la méthode run est appelée sur un thread séparé, d'autres threads peuvent envoyer des messages au réseau avant que le réseau soit entièrement connecté. Le membre de données _source est un objet unbounded_buffer qui met en mémoire tampon toute entrée envoyée de l'application à l'agent. Pour s'assurer que le réseau traite tous les messages d'entrée, l'agent commence par associer les nœuds internes du réseau, puis associe le début de ce réseau, connector, au membre de données _source. Vous êtes ainsi certain que les messages ne sont pas traités pendant la formation du réseau.

Étant donné que, dans cet exemple, le réseau est basé sur le flux de données, et non pas sur le flux de contrôle, le réseau doit informer l'agent lorsqu'il a terminé de traiter chaque valeur d'entrée et que le nœud de sentinelle a reçu sa valeur. Cet exemple utilise un objet countdown_event pour signaler que toutes les valeurs d'entrée ont été traitées et un objet Concurrency::event pour indiquer que le nœud de sentinelle a reçu sa valeur. La classe countdown_event utilise un objet event pour signaler qu'une valeur de compteur atteint zéro. Le début du réseau de flux de données incrémente le compteur chaque fois qu'il reçoit une valeur. Chaque nœud de terminaison du réseau décrémente le compteur après qu'il ait traité la valeur d'entrée. Une fois que l'agent a formé le réseau de flux de données, il attend que le nœud de sentinelle définisse l'objet event et que l'objet countdown_event signale que son compteur a atteint zéro.

L'exemple suivant illustre les classes control_flow_agent, dataflow_agent et countdown_event. La fonction wmain crée un objet control_flow_agent et un objet dataflow_agent, et utilise la fonction send_values pour envoyer une série de valeurs aléatoires aux agents.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace Concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Cet exemple génère l'exemple de sortie suivant :

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Compilation du code

Copiez l'exemple de code et collez-le dans un projet Visual Studio ou dans un fichier nommé dataflow-agent.cpp, puis exécutez la commande suivante dans une fenêtre d'invite de commandes Visual Studio 2010.

cl.exe /EHsc dataflow-agent.cpp

[retour en haut]

Création d'un agent d'enregistrement des messages

L'exemple suivant illustre la classe log_agent, qui s'apparente à la classe dataflow_agent. La classe log_agent implémente un agent d'enregistrement asynchrone qui écrit des messages d'enregistrement dans un fichier et dans la console. La classe log_agent permet à l'application de classer les messages en tant que messages d'information, d'avertissement ou d'erreur. Elle permet également à l'application de spécifier si chaque catégorie d'enregistrement est écrite dans un fichier, dans la console ou dans les deux. Cet exemple écrit tous les messages d'enregistrement dans un fichier et écrit uniquement les messages d'erreur dans la console.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
   log_info    = 0x1,
   log_warning = 0x2,
   log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
   // Holds a message string and its logging type.
   struct log_message
   {
      wstring message;
      log_message_type type;
   };

public:
   log_agent(const wstring& file_path, log_message_type file_messages, 
      log_message_type console_messages)
      : _file(file_path)
      , _file_messages(file_messages)
      , _console_messages(console_messages)    
      , _active(0)
   {
      if (_file.bad())
         throw invalid_argument("Unable to open log file.");
   }

   // Writes the provided message to the log.
   void log(const wstring& message, log_message_type type)
   {  
      // Increment the active message count.
      _active.add_count();

      // Send the message to the network.
      log_message msg = { message, type };      
      send(_log_buffer, msg);
   }

   void close()
   {
      // Signal that the agent is now closed.
      _closed.set();
   }

protected:

   void run()
   {
      //
      // Create the members of the dataflow network.
      //

      // Offers messages to the file writer and the console writer.
      overwrite_buffer<log_message> connector;

      // Writes a log message to file.
      call<log_message> file_writer(
         [this](log_message msg) {
            // Write the message to the file.
            write_to_stream(msg, _file);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool {
            // Accept only message types that are to be written to file.
            return (msg.type & _file_messages) != 0;
         });

       // Writes a log message to the console.
      call<log_message> console_writer(
         [this](log_message msg) {
            // Write the message to the console.
            write_to_stream(msg, wcout);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool  {
            // Accept only message types that are to be written to file.
            return (msg.type & _console_messages) != 0;
         });

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&file_writer);
      connector.link_target(&console_writer);

      // Connect _log_buffer to the internal network to begin data flow.
      _log_buffer.link_target(&connector);

      // Wait for the closed event to be signaled.
      _closed.wait();

      // Wait for all messages to be processed.
      _active.wait();

      // Close the log file and flush the console.
      _file.close();
      wcout.flush();

      // Set the agent to the completed state.
      done();
   }

private:
   // Writes a logging message to the specified output stream.
   void write_to_stream(const log_message& msg, wostream& stream)
   {
      // Write the message to the stream.
      wstringstream ss;

      switch (msg.type)
      {
      case log_info:
         ss << L"info: ";
         break;
      case log_warning:
         ss << L"warning: ";
         break;
      case log_error:
         ss << L"error: ";
      }

      ss << msg.message << endl;
      stream << ss.str();
   }

private:   
   // The file stream to write messages to.
   wofstream _file;   

   // The log message types that are written to file.
   log_message_type _file_messages;

   // The log message types that are written to the console.
   log_message_type _console_messages;

   // The head of the network. Propagates logging messages
   // to the rest of the network.
   unbounded_buffer<log_message> _log_buffer;   

   // Counts the number of active messages in the network.
   countdown_event _active;

   // Signals that the agent has been closed.
   event _closed;
};

int wmain()
{
   // Union of all log message types.
   log_message_type log_all = 
      log_message_type(log_info | log_warning  | log_error);

   // Create a logging agent that writes all log messages to file and error 
   // messages to the console.
   log_agent logger(L"log.txt", log_all, log_error);

   // Start the agent.
   logger.start();

   // Log a few messages.

   logger.log(L"===Logging started.===", log_info);

   logger.log(L"This is a sample warning message.", log_warning);
   logger.log(L"This is a sample error message.", log_error);

   logger.log(L"===Logging finished.===", log_info);

   // Close the logger and wait for the agent to finish.
   logger.close();
   agent::wait(&logger);
}

Cet exemple écrit la sortie suivante dans la console.

error: This is a sample error message.

Cet exemple génère également le fichier log.txt, qui contient le texte suivant.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Compilation du code

Copiez l'exemple de code et collez-le dans un projet Visual Studio ou dans un fichier nommé log-filter.cpp, puis exécutez la commande suivante dans une fenêtre d'invite de commandes Visual Studio 2010.

cl.exe /EHsc log-filter.cpp

[retour en haut]

Voir aussi

Concepts

Procédures pas à pas relatives au runtime d'accès concurrentiel