How to: Implement a Producer-Consumer Dataflow Pattern

.NET Framework 4.5

This document describes how to use the TPL Dataflow Library to implement a producer-consumer pattern. In this pattern, the producer sends messages to a message block, and the consumer reads messages from that block.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

The following example demonstrates a basic producer- consumer model that uses dataflow. The Produce method writes arrays that contain random bytes of data to a System.Threading.Tasks.Dataflow.ITargetBlock<TInput> object and the Consume method reads bytes from a System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> object. By acting on the ISourceBlock<TOutput> and ITargetBlock<TInput> interfaces, instead of their derived types, you can write reusable code that can act on a variety of dataflow block types. This example uses the BufferBlock<T> class. Because the BufferBlock<T> class acts as both a source block and as a target block, the producer and the consumer can use a shared object to transfer data.

The Produce method calls the Post<TInput> method in a loop to synchronously write data to the target block. After the Produce method writes all data to the target block, it calls the Complete method to indicate that the block will never have additional data available. The Consume method uses the async and await operators (Async and Await in Visual Basic) to asynchronously compute the total number of bytes that are received from the ISourceBlock<TOutput> object. To act asynchronously, the Consume method calls the OutputAvailableAsync method to receive a notification when the source block has data available and when the source block will never have additional data available.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a basic producer and consumer pattern that uses dataflow. 
class DataflowProducerConsumer
{
   // Demonstrates the production end of the producer and consumer pattern. 
   static void Produce(ITargetBlock<byte[]> target)
   {
      // Create a Random object to generate random data.
      Random rand = new Random();

      // In a loop, fill a buffer with random data and 
      // post the buffer to the target block. 
      for (int i = 0; i < 100; i++)
      {
         // Create an array to hold random byte data. 
         byte[] buffer = new byte[1024];

         // Fill the buffer with random bytes.
         rand.NextBytes(buffer);

         // Post the result to the message block.
         target.Post(buffer);
      }

      // Set the target to the completed state to signal to the consumer 
      // that no more data will be available.
      target.Complete();
   }

   // Demonstrates the consumption end of the producer and consumer pattern. 
   static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
   {
      // Initialize a counter to track the number of bytes that are processed. 
      int bytesProcessed = 0;

      // Read from the source buffer until the source buffer has no  
      // available output data. 
      while (await source.OutputAvailableAsync())
      {
         byte[] data = source.Receive();

         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }

      return bytesProcessed;
   }

   static void Main(string[] args)
   {
      // Create a BufferBlock<byte[]> object. This object serves as the  
      // target block for the producer and the source block for the consumer. 
      var buffer = new BufferBlock<byte[]>();

      // Start the consumer. The Consume method runs asynchronously.  
      var consumer = ConsumeAsync(buffer);

      // Post source data to the dataflow block.
      Produce(buffer);

      // Wait for the consumer to process all data.
      consumer.Wait();

      // Print the count of bytes processed to the console.
      Console.WriteLine("Processed {0} bytes.", consumer.Result);
   }
}

/* Output:
Processed 102400 bytes.
*/

Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named DataflowProducerConsumer.cs (DataflowProducerConsumer.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowProducerConsumer.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowProducerConsumer.vb

This example uses just one consumer to process the source data. If you have multiple consumers in your application, use the TryReceive method to read data from the source block, as shown in the following example.

// Demonstrates the consumption end of the producer and consumer pattern. 
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
   // Initialize a counter to track the number of bytes that are processed. 
   int bytesProcessed = 0;

   // Read from the source buffer until the source buffer has no  
   // available output data. 
   while (await source.OutputAvailableAsync())
   {
      byte[] data;
      while (source.TryReceive(out data))
      {
         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }
   }

   return bytesProcessed;
}

The TryReceive method returns False when no data is available. When multiple consumers must access the source block concurrently, this mechanism guarantees that data is still available after the call to OutputAvailableAsync.

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback
Show:
© 2014 Microsoft