Пошаговое руководство. Создание агента потоков данных

В этом документе показано, как создавать приложения на основе агентов, базирующиеся на потоке данных, а не на потоке управления.

Поток управления — это порядок выполнения операций в программе. Поток управления контролируется с помощью структур управления, например условных операторов, циклов и так далее. А поток данных — это модель программирования, в которой вычисления выполняются, только когда доступны все необходимые данные. Модель программирования на основе потока данных связана с понятием передачи сообщений, так как в этой модели независимые компоненты программы взаимодействуют друг с другом посредством отправки сообщений.

Асинхронные агенты поддерживают модели программирования как на основе потока управления, так и на основе потока данных. Модель на основе потока управления подходит для многих ситуаций, но иногда лучше использовать модель на основе потока данных, например если агент получает данные и выполняет действия, связанные с полезной нагрузкой этих данных.

Обязательные компоненты

Прежде чем начать выполнение этого пошагового руководства, необходимо ознакомиться со следующими документами.

Подразделы

Это пошаговое руководство содержит следующие подразделы.

  • Создание основного агента потока управления

  • Создание основного агента потока данных

  • Создание агента ведения журнала сообщений

Создание основного агента потока управления

Рассмотрим следующий пример, определяющий класс control_flow_agent. Класс control_flow_agent работает с тремя буферами сообщений: одним входным буфером и двумя буферами вывода. Метод run циклически выполняет считывание из исходного буфера сообщений и использует условный оператор, чтобы направить поток выполнения программы. Агент увеличивает один счетчик для ненулевых отрицательных значений, а другой — для ненулевых положительных значений. Когда агент получает значение-метку, равное нулю, он отправляет значения счетчиков буферам сообщений вывода. Методы negatives и positives позволяют приложению считывать из агента количества отрицательных и положительных значений.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Несмотря на то, что в этом примере показано простое использование потока управления в агенте, он демонстрирует последовательную природу программирования на основе потока управления. Все сообщения должны обрабатываться последовательно, даже если во входном буфере может оказаться несколько сообщений. Модель потока данных позволяет оценивать обе ветви условного оператора параллельно. Также модель потока данных позволяет создавать более сложные сети сообщений, работающие с данными по мере их поступления.

[в начало]

Создание основного агента потока данных

В этом разделе показано, как преобразовать класс control_flow_agent, чтобы выполнять ту же задачу в модели, основанной на потоке данных.

Агент потока данных создает сеть буферов сообщений, каждый из которых служит определенной цели. Некоторые блоки сообщений используют функции фильтрации, чтобы принимать или отклонять сообщения в зависимости от их полезной нагрузки. Функция фильтрации позволяет блоку сообщений получать только определенные значения.

Преобразование агента потока управления в агент потока данных

  1. Скопируйте текст класса control_flow_agent в другой класс, например dataflow_agent. Также можно переименовать класс control_flow_agent.

  2. Удалите основную часть цикла, вызывающую функцию receive из метода run.

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. В методе run после инициализации переменных negative_count и positive_count добавьте объект countdown_event, подсчитывающий активные операции.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    Использование класса countdown_event показано далее в этом разделе.

  4. Создайте объекты буферов сообщений, которые будут принимать участие в сети потока данных.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Соедините буферы сообщений, чтобы создать сеть.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Дождитесь установки объектов event и countdown event. Эти события указывают, что агент получил значение-метку и все операции завершились.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

На следующей схеме показана полная сеть потока данных для класса dataflow_agent.

Сеть потока данных

В следующей таблице описаны члены сети.

Элемент

Описание

increment_active

Объект Concurrency::transformer, увеличивающий счетчик активных событий и передающий входное значение остальной сети.

negatives, positives

Объекты Concurrency::call, увеличивающие счетчик чисел и уменьшающие счетчик активных событий. Все объекты используют фильтры, чтобы принимать отрицательные или положительные числа.

sentinel

Объект Concurrency::call, принимающий только значение-метку и уменьшающий счетчик активных событий.

connector

Объект Concurrency::unbounded_buffer, присоединяющий исходный буфер сообщений к внутренней сети.

Так как метод run вызывается в параллельном потоке, другие потоки могут отправлять сообщения в сеть до полного соединения сети. Член данных _source — это объект unbounded_buffer, помещающий в буфер все входные данные, отправленные приложением агенту. Чтобы сеть обработала все входные сообщения, агент сначала соединяет внутренние узлы сети, а затем соединяет начало сети, connector, с членом данных _source. Таким образом обработка сообщений не начинается до формирования сети.

Так как сеть в этом примере основана на потоке данных, а не на потоке управления, сеть должна сообщать агенту, что она завершали обработку всех входных значений и что соответствующим узлом получено значение-метка. В этом примере объект countdown_event используется, чтобы сообщить, что все входные значения обработаны, и объект Concurrency::event, указывающий, что соответствующий узел получил значение-метку. Класс countdown_event использует объект event, чтобы сообщить, что значение счетчика достигло нуля. Начало сети потока данных увеличивает счетчик при каждом получении значения. Конечные узлы сети уменьшают счетчик после обработки входных значений. Когда агент формирует сеть потока данных, он ожидает, пока узел, получающий метку, не установит объект event, а объект countdown_event не сообщит, что его счетчик достиг нуля.

В следующем примере показаны классы control_flow_agent, dataflow_agent и countdown_event. Функция wmain создает объекты control_flow_agent и dataflow_agent и использует функцию send_values, чтобы отправить агентам набор произвольных значений.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace Concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

В данном примере получается следующий результат.

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Компиляция кода

Скопируйте код примера и вставьте его в проект Visual Studio или в файл с именем dataflow-agent.cpp, затем выполните в окне командной строки Visual Studio 2010 следующую команду.

cl.exe /EHsc dataflow-agent.cpp

[в начало]

Создание агента ведения журнала сообщений

В следующем примере показан класс log_agent, похожий на класс dataflow_agent. Класс log_agent реализует асинхронный агент ведения журнала, который записывает журнал сообщений в файл и выводит его на консоль. Класс log_agent позволяет приложению классифицировать сообщения на информационные, предупреждающие и сообщающие об ошибках. Он также позволяет приложению задавать, куда записывается каждая из категорий журнала: в файл, на консоль или в оба назначения. Этот пример записывает все сообщения журнала в файл, а на консоль выводит только сообщения об ошибках.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
   log_info    = 0x1,
   log_warning = 0x2,
   log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
   // Holds a message string and its logging type.
   struct log_message
   {
      wstring message;
      log_message_type type;
   };

public:
   log_agent(const wstring& file_path, log_message_type file_messages, 
      log_message_type console_messages)
      : _file(file_path)
      , _file_messages(file_messages)
      , _console_messages(console_messages)    
      , _active(0)
   {
      if (_file.bad())
         throw invalid_argument("Unable to open log file.");
   }

   // Writes the provided message to the log.
   void log(const wstring& message, log_message_type type)
   {  
      // Increment the active message count.
      _active.add_count();

      // Send the message to the network.
      log_message msg = { message, type };      
      send(_log_buffer, msg);
   }

   void close()
   {
      // Signal that the agent is now closed.
      _closed.set();
   }

protected:

   void run()
   {
      //
      // Create the members of the dataflow network.
      //

      // Offers messages to the file writer and the console writer.
      overwrite_buffer<log_message> connector;

      // Writes a log message to file.
      call<log_message> file_writer(
         [this](log_message msg) {
            // Write the message to the file.
            write_to_stream(msg, _file);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool {
            // Accept only message types that are to be written to file.
            return (msg.type & _file_messages) != 0;
         });

       // Writes a log message to the console.
      call<log_message> console_writer(
         [this](log_message msg) {
            // Write the message to the console.
            write_to_stream(msg, wcout);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool  {
            // Accept only message types that are to be written to file.
            return (msg.type & _console_messages) != 0;
         });

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&file_writer);
      connector.link_target(&console_writer);

      // Connect _log_buffer to the internal network to begin data flow.
      _log_buffer.link_target(&connector);

      // Wait for the closed event to be signaled.
      _closed.wait();

      // Wait for all messages to be processed.
      _active.wait();

      // Close the log file and flush the console.
      _file.close();
      wcout.flush();

      // Set the agent to the completed state.
      done();
   }

private:
   // Writes a logging message to the specified output stream.
   void write_to_stream(const log_message& msg, wostream& stream)
   {
      // Write the message to the stream.
      wstringstream ss;

      switch (msg.type)
      {
      case log_info:
         ss << L"info: ";
         break;
      case log_warning:
         ss << L"warning: ";
         break;
      case log_error:
         ss << L"error: ";
      }

      ss << msg.message << endl;
      stream << ss.str();
   }

private:   
   // The file stream to write messages to.
   wofstream _file;   

   // The log message types that are written to file.
   log_message_type _file_messages;

   // The log message types that are written to the console.
   log_message_type _console_messages;

   // The head of the network. Propagates logging messages
   // to the rest of the network.
   unbounded_buffer<log_message> _log_buffer;   

   // Counts the number of active messages in the network.
   countdown_event _active;

   // Signals that the agent has been closed.
   event _closed;
};

int wmain()
{
   // Union of all log message types.
   log_message_type log_all = 
      log_message_type(log_info | log_warning  | log_error);

   // Create a logging agent that writes all log messages to file and error 
   // messages to the console.
   log_agent logger(L"log.txt", log_all, log_error);

   // Start the agent.
   logger.start();

   // Log a few messages.

   logger.log(L"===Logging started.===", log_info);

   logger.log(L"This is a sample warning message.", log_warning);
   logger.log(L"This is a sample error message.", log_error);

   logger.log(L"===Logging finished.===", log_info);

   // Close the logger and wait for the agent to finish.
   logger.close();
   agent::wait(&logger);
}

Этот пример выводит на консоль следующий результат.

error: This is a sample error message.

В этом примере также создается файл log.txt, содержащий следующий текст.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Компиляция кода

Скопируйте код примера и вставьте его в проект Visual Studio или в файл с именем log-filter.cpp, затем выполните в окне командной строки Visual Studio 2010 следующую команду.

cl.exe /EHsc log-filter.cpp

[в начало]

См. также

Основные понятия

Пошаговые руководства по среде выполнения с параллелизмом