Condividi tramite


Procedura dettagliata: creazione di un agente del flusso di dati

In questo documento viene illustrato come creare le applicazioni basate sugli agenti in base al flusso di dati, anziché al flusso di controllo.

Il flusso di controllo si riferisce all'ordine di esecuzione delle operazioni in un programma. Il flusso di controllo viene regolato mediante strutture di controllo come istruzioni condizionali, cicli e così via. In alternativa, il flusso di dati si riferisce a un modello di programmazione in cui i calcoli vengono eseguiti solo quando tutti i dati richiesti sono disponibili. Il modello di programmazione del flusso di dati è correlato al concetto di passaggio dei messaggi, in cui i componenti indipendenti di un programma comunicano con un altro programma inviando messaggi.

Gli agenti asincroni supportano entrambi i modelli di programmazione del flusso di dati e del flusso di controllo. Sebbene il modello del flusso di controllo sia appropriato in molti casi, il modello del flusso di dati risulta appropriato in altri, ad esempio quando un agente riceve i dati ed esegue un'azione basata sul payload di tali dati.

Prerequisiti

Prima di iniziare questa procedura dettagliata, leggere i documenti riportati di seguito.

Sezioni

In questa procedura dettagliata sono contenute le sezioni seguenti:

  • Creazione di un agente di base del flusso di controllo

  • Creazione di un agente di base del flusso di dati

  • Creazione di un agente di registrazione dei messaggi

Creazione di un agente di base del flusso di controllo

Si consideri l'esempio seguente che definisce la classe control_flow_agent. La classe control_flow_agent agisce su tre buffer dei messaggi: un buffer di input e due buffer di output. Il metodo run legge i dati dal buffer dei messaggi di origine in un ciclo e utilizza un'istruzione condizionale per indirizzare il flusso di esecuzione del programma. L'agente incrementa un contatore per valori negativi diversi da zero e incrementa un altro contatore per valori positivi diversi da zero. Dopo aver ricevuto il valore sentinel zero, l'agente invia i valori dei contatori nel buffer dei messaggi di output. I metodi positives e negatives consentono all'applicazione di leggere i conteggi dei valori negativi e positivi dall'agente.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Sebbene in questo esempio venga illustrato l'utilizzo di base del flusso di controllo in un agente, viene indicata la natura seriale della programmazione basata sul flusso di controllo. Ogni messaggio deve essere elaborato in sequenza, anche se nel buffer dei messaggi di input potrebbero essere disponibili più messaggi. Il modello del flusso di dati consente la valutazione simultanea di entrambi i rami dell'istruzione condizionale. Il modello del flusso di dati consente inoltre di creare reti di messaggistica più complesse che agiscono sui dati man mano che diventano disponibili.

[vai all'inizio]

Creazione di un agente di base del flusso di dati

In questa sezione viene illustrato come convertire la classe control_flow_agent in modo da utilizzare il modello del flusso di dati per eseguire la stessa attività.

L'agente del flusso di dati prevede la creazione di una rete di buffer dei messaggi, ciascuno dei quali viene utilizzato per uno scopo specifico. Alcuni blocchi di messaggi utilizzano una funzione di filtro per accettare o rifiutare un messaggio in base al relativo payload. Le funzioni di filtro garantiscono che un blocco di messaggi riceva solo determinati valori.

Per convertire l'agente del flusso di controllo in un agente del flusso di dati

  1. Copiare il corpo della classe control_flow_agent in un'altra classe, ad esempio dataflow_agent. In alternativa, è possibile rinominare la classe control_flow_agent.

  2. Rimuovere il corpo del ciclo che chiama receive dal metodo run.

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. Dopo l'inizializzazione delle variabili negative_count e positive_count aggiungere nel metodo run un oggetto countdown_event che tiene traccia del conteggio delle operazioni attive.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    La classe countdown_event viene illustrata più avanti in questo argomento.

  4. Creare gli oggetti del buffer dei messaggi che prenderanno parte alla rete del flusso di dati.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Connettere i buffer dei messaggi per formare una rete.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Attendere l'impostazione degli oggetti countdown event e event. Questi eventi segnalano che l'agente ha ricevuto il valore sentinel e che tutte le operazioni sono state completate.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

Nel diagramma seguente viene illustrata la rete del flusso di dati completa per la classe dataflow_agent:

Rete del flusso di dati

Nella tabella seguente sono descritti i membri della rete.

Membro

Descrizione

increment_active

Oggetto Concurrency::transformer che incrementa il contatore degli eventi attivo e passa il valore di input al resto della rete.

negatives, positives

Oggetti Concurrency::call che incrementano il conteggio dei numeri e decrementano il contatore degli eventi attivo. Gli oggetti utilizzano ciascuno un filtro per accettare i numeri negativi o i numeri positivi.

sentinel

Oggetto Concurrency::call che accetta solo il valore sentinel zero e decrementa il contatore degli eventi attivo.

connector

Oggetto Concurrency::unbounded_buffer che connette il buffer dei messaggi di origine alla rete interna.

Poiché il metodo run viene chiamato in un thread separato, gli altri thread possono inviare messaggi alla rete prima che la rete sia completamente connessa. Il membro dati _source è un oggetto unbounded_buffer che memorizza nel buffer l'input inviato dall'applicazione all'agente. Per assicurarsi che la rete elabori tutti i messaggi di input, l'agente collega innanzitutto i nodi interni della rete e quindi collega l'inizio di tale rete, connector, al membro dati _source. In questo modo si garantisce che i messaggi non vengano elaborati durante la preparazione della rete.

Poiché la rete in questo esempio è basata sul flusso di dati anziché sul flusso di controllo, la rete deve comunicare all'agente di aver completato l'elaborazione di ogni valore di input e che il nodo sentinel ha ricevuto il relativo valore. In questo esempio viene utilizzato un oggetto countdown_event per segnalare che tutti i valori di input sono stati elaborati e un oggetto Concurrency::event per indicare che il nodo sentinel ha ricevuto il relativo valore. La classe countdown_event utilizza un oggetto event per segnalare quando un valore del contatore raggiunge lo zero. L'intestazione della rete del flusso di dati incrementa il contatore ogni volta che riceve un valore. Ogni nodo terminale della rete decrementa il contatore dopo l'elaborazione del valore di input. Dopo aver preparato la rete del flusso di dati, l'agente attende che il nodo sentinel imposti l'oggetto event e che l'oggetto countdown_event segnali che il contatore ha raggiunto lo zero.

Nell'esempio seguente vengono illustrate le classi control_flow_agent, dataflow_agent e countdown_event. La funzione wmain crea un oggetto control_flow_agent e dataflow_agent e utilizza la funzione send_values per inviare una serie di valori casuali agli agenti.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace Concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Questo esempio produce l'output seguente:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio o incollarlo in un file denominato dataflow-agent.cpp, quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio 2010.

cl.exe /EHsc dataflow-agent.cpp

[vai all'inizio]

Creazione di un agente di registrazione dei messaggi

Nell'esempio seguente viene illustrata la classe log_agent, analoga alla classe dataflow_agent. La classe log_agent implementa un agente di registrazione asincrona che scrive i messaggi di log in un file e sulla console. La classe log_agent consente all'applicazione di suddividere i messaggi in categorie: informativi, di avviso o di errore. Consente inoltre all'applicazione di specificare se ogni categoria del log viene scritta in un file, nella console o in entrambi. In questo esempio vengono scritti tutti i messaggi di log in un file e solo i messaggi di errore nella console.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
   log_info    = 0x1,
   log_warning = 0x2,
   log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
   // Holds a message string and its logging type.
   struct log_message
   {
      wstring message;
      log_message_type type;
   };

public:
   log_agent(const wstring& file_path, log_message_type file_messages, 
      log_message_type console_messages)
      : _file(file_path)
      , _file_messages(file_messages)
      , _console_messages(console_messages)    
      , _active(0)
   {
      if (_file.bad())
         throw invalid_argument("Unable to open log file.");
   }

   // Writes the provided message to the log.
   void log(const wstring& message, log_message_type type)
   {  
      // Increment the active message count.
      _active.add_count();

      // Send the message to the network.
      log_message msg = { message, type };      
      send(_log_buffer, msg);
   }

   void close()
   {
      // Signal that the agent is now closed.
      _closed.set();
   }

protected:

   void run()
   {
      //
      // Create the members of the dataflow network.
      //

      // Offers messages to the file writer and the console writer.
      overwrite_buffer<log_message> connector;

      // Writes a log message to file.
      call<log_message> file_writer(
         [this](log_message msg) {
            // Write the message to the file.
            write_to_stream(msg, _file);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool {
            // Accept only message types that are to be written to file.
            return (msg.type & _file_messages) != 0;
         });

       // Writes a log message to the console.
      call<log_message> console_writer(
         [this](log_message msg) {
            // Write the message to the console.
            write_to_stream(msg, wcout);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool  {
            // Accept only message types that are to be written to file.
            return (msg.type & _console_messages) != 0;
         });

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&file_writer);
      connector.link_target(&console_writer);

      // Connect _log_buffer to the internal network to begin data flow.
      _log_buffer.link_target(&connector);

      // Wait for the closed event to be signaled.
      _closed.wait();

      // Wait for all messages to be processed.
      _active.wait();

      // Close the log file and flush the console.
      _file.close();
      wcout.flush();

      // Set the agent to the completed state.
      done();
   }

private:
   // Writes a logging message to the specified output stream.
   void write_to_stream(const log_message& msg, wostream& stream)
   {
      // Write the message to the stream.
      wstringstream ss;

      switch (msg.type)
      {
      case log_info:
         ss << L"info: ";
         break;
      case log_warning:
         ss << L"warning: ";
         break;
      case log_error:
         ss << L"error: ";
      }

      ss << msg.message << endl;
      stream << ss.str();
   }

private:   
   // The file stream to write messages to.
   wofstream _file;   

   // The log message types that are written to file.
   log_message_type _file_messages;

   // The log message types that are written to the console.
   log_message_type _console_messages;

   // The head of the network. Propagates logging messages
   // to the rest of the network.
   unbounded_buffer<log_message> _log_buffer;   

   // Counts the number of active messages in the network.
   countdown_event _active;

   // Signals that the agent has been closed.
   event _closed;
};

int wmain()
{
   // Union of all log message types.
   log_message_type log_all = 
      log_message_type(log_info | log_warning  | log_error);

   // Create a logging agent that writes all log messages to file and error 
   // messages to the console.
   log_agent logger(L"log.txt", log_all, log_error);

   // Start the agent.
   logger.start();

   // Log a few messages.

   logger.log(L"===Logging started.===", log_info);

   logger.log(L"This is a sample warning message.", log_warning);
   logger.log(L"This is a sample error message.", log_error);

   logger.log(L"===Logging finished.===", log_info);

   // Close the logger and wait for the agent to finish.
   logger.close();
   agent::wait(&logger);
}

L'esempio scrive sulla console l'output seguente.

error: This is a sample error message.

Questo esempio crea inoltre il file log.txt, che contiene il testo seguente.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto Visual Studio oppure incollarlo in un file denominato log-filter.cpp, quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio 2010.

cl.exe /EHsc log-filter.cpp

[vai all'inizio]

Vedere anche

Concetti

Procedure dettagliate del runtime di concorrenza