Практическое руководство. Реализация различных шаблонов "источник-приемник"

В этом разделе описана реализация шаблона "производитель-получатель" в приложении. В этом шаблоне производитель отправляет сообщения в блок сообщений, а получатель считывает сообщения из этого блока.

В разделе показано два сценария. В первом сценарии получатель должен получить каждое сообщение, отправленное производителем. Во втором сценарии получатель запрашивает данные периодически и поэтому не обязательно получает каждое сообщение.

В обоих примерах в этом разделе для передачи сообщений от производителя получателю используются агенты, блоки сообщений и функции передачи сообщений. Агент производителя использует функцию Concurrency::send для записи сообщений в объект Concurrency::ITarget. Для чтения сообщений из объекта Concurrency::ISource агент потребителя использует функцию Concurrency::receive. Оба агента содержат значение-метку, чтобы координировать завершение обработки.

Дополнительные сведения об асинхронных агентах см. в разделе Асинхронные агенты. Дополнительные сведения о блоках сообщений и функциях передачи сообщений см. в разделах Асинхронные блоки сообщений и Функции передачи сообщений.

Пример

В этом примере агент производителя отправляет агенту получателя ряд чисел. Получатель принимает эти числа и вычисляет их среднее значение. Приложение выводит среднее на консоль.

В этом примере для предоставления производителю возможности ставить сообщения в очередь используется объект Concurrency::unbounded_buffer. Класс unbounded_buffer реализует ITarget и ISource, чтобы производитель и получатель могли отправлять сообщения в общий буфер и получать их из него. Функции send и receive координируют задачу передачи данных от производителя к получателю.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

После выполнения примера получается следующий результат.

The average is 50.

В этом примере агент производителя отправляет агенту получателя ряд биржевых котировок. Агент получателя периодически выполняет чтение текущей котировки и выводит ее на консоль.

Этот пример похож на предыдущий за исключением того, что в нем для разрешения совместного использования одного сообщения производителем и получателем используется объект Concurrency::overwrite_buffer. Как и в предыдущем примере, класс overwrite_buffer реализует объекты ITarget и ISource, чтобы производитель и получатель могли совместно работать с одним буфером сообщений.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (quotes.begin(), quotes.end(), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         Concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         Concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

В данном примере получается следующий результат.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

В отличие от объекта unbounded_buffer, функция receive не удаляет сообщение из объекта overwrite_buffer. Если получатель выполняет чтение из буфера сообщений более одного раза, прежде чем производитель перезапишет это сообщение, приемник получает это сообщение столько раз, сколько было выполнено чтение.

Компиляция кода

Скопируйте код примера и вставьте его в проект Visual Studio или в файл с именем producer-consumer.cpp, затем выполните в окне командной строки Visual Studio 2010 следующую команду.

cl.exe /EHsc producer-consumer.cpp

См. также

Основные понятия

Библиотека асинхронных агентов

Асинхронные агенты

Асинхронные блоки сообщений

Функции передачи сообщений