共用方式為


如何:實作各種生產者-消費者模式

本主題說明如何在您的應用程式中實作生產者-消費者模式。 在這個模式中,「生產者」(Producer) 會傳送訊息至訊息區塊,而「消費者」(Consumer) 則會從該區塊讀取訊息。

本主題會示範兩個情境。 在第一個情境中,消費者必須收到產生者所傳送的每個訊息。 在第二個情境中,消費者會定期輪詢資料,因此不需要收到每個訊息。

本主題中的兩個範例都會使用代理程式、訊息區塊和訊息傳遞函式,將訊息從生產者傳輸給消費者。 生產者代理程式會使用 concurrency::send 函式將訊息寫入至 concurrency::ITarget 物件。 消費者代理程式會使用 concurrency::receive 函式從 concurrency::ISource 物件讀取訊息。 這兩個代理程式都保有 Sentinel 值,以協調處理結束。

如需非同步代理程式的詳細資訊,請參閱非同步代理程式。 如需訊息區塊和訊息傳遞函式的詳細資訊,請參閱非同步訊息區訊息傳遞函式

範例

在這個範例中,生產者代理程式會將一系列的數字傳送給消費者代理程式。 消費者在收到所有這些數字之後,會計算它們的平均值。 應用程式會將平均值寫入主控台。

這個範例會使用 concurrency::unbounded_buffer 物件,讓生產者將訊息放入佇列中。 unbounded_buffer 類別會實作 ITargetISource,讓生產者和消費者可以在共用緩衝區中傳送與接收訊息。 sendreceive 函式會協調將資料從生產者傳播給消費者的工作。

// producer-consumer-average.cpp 
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values. 
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer. 
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send. 
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing. 
   int _sentinel;
};

// Demonstrates a basic agent that consumes values. 
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values. 
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values. 
      int sum = 0;
      // The count of values received. 
      int count = 0;

      // Read from the source block until we receive the  
      // sentinel value. 
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing. 
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing. 
   const int sentinel = 0;
   // The number of values for the producer agent to send. 
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

這個範例產生下列輸出。

  

在這個範例中,生產者代理程式會將一系列的股價傳送給消費者代理程式。 消費者代理程式會定期讀取目前的報價,並將它列印至主控台。

這個範例與前一個範例類似,不同之處在於它是使用 concurrency::overwrite_buffer 物件讓生產者將某個訊息與消費者共用。 與前一個範例相同,overwrite_buffer 類別會實作 ITargetISource,讓生產者和消費者可以對共用訊息緩衝區執行動作。

// producer-consumer-quotes.cpp 
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values. 
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes.  
      // A real-world application would read these from an external source,  
      // such as a network connection or a database. 
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values. 
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive 
      // a negative value. 
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

這個範例 (Example) 產生下列範例 (Sample) 輸出。

  

與使用 unbounded_buffer 物件時不同,receive 函式並不會從 overwrite_buffer 物件移除訊息。 如果消費者在生產者覆寫該訊息之前多次讀取訊息緩衝區,則消費者每次都會得到相同的訊息。

編譯程式碼

請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 producer-consumer.cpp 的檔案中,然後在 Visual Studio 的 [命令提示字元] 視窗中執行下列命令。

cl.exe /EHsc producer-consumer.cpp

請參閱

概念

非同步代理程式程式庫

非同步代理程式

非同步訊息區

訊息傳遞函式