Export (0) Print
Expand All
0 out of 1 rated this helpful - Rate this topic

How to: Write Messages to and Read Messages from a Dataflow Block

.NET Framework 4.5

This document describes how to use the TPL Dataflow Library to write messages to and read messages from a dataflow block. The TPL Dataflow Library provides both synchronous and asynchronous methods for writing messages to and reading messages from a dataflow block. This document uses the System.Threading.Tasks.Dataflow.BufferBlock<T> class. The BufferBlock<T> class buffers messages and behaves as both a message source and as a message target.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

The following example uses the Post<TInput> method to write to a BufferBlock<T> dataflow block and the Receive method to read from the same object.

// Create a BufferBlock<int> object. 
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block. 
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block. 
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */

You can also use the TryReceive method to read from a dataflow block, as shown in the following example. The TryReceive method does not block the current thread and is useful when you occasionally poll for data.

// Post more messages to the block. 
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block. 
int value;
while (bufferBlock.TryReceive(out value))
{
   Console.WriteLine(value);
}

/* Output:
   0
   1
   2
 */

Because the Post<TInput> method acts synchronously, the BufferBlock<T> object in the previous examples receives all data before the second loop reads data. The following example extends the first example by using Invoke to read from and write to the message block concurrently. Because Invoke performs actions concurrently, the values are not written to the BufferBlock<T> object in any specific order.

// Write to and read from the message block concurrently. 
var post01 = Task.Run(() =>
   {
      bufferBlock.Post(0);
      bufferBlock.Post(1);
   });
var receive = Task.Run(() => 
   {
      for (int i = 0; i < 3; i++)
      {
         Console.WriteLine(bufferBlock.Receive());
      }
   });
var post2 = Task.Run(() => 
   {
      bufferBlock.Post(2);
   });
Task.WaitAll(post01, receive, post2);

/* Sample output:
   2
   0
   1
 */

The following example uses the SendAsync method to asynchronously write to a BufferBlock<T> object and the ReceiveAsync method to asynchronously read from the same object. This example uses the async and await operators (Async and Await in Visual Basic) to asynchronously send data to and read data from the target block. The SendAsync method is useful when you must enable a dataflow block to postpone messages. The ReceiveAsync method is useful when you want to act on data when that data becomes available. For more information about how messages propagate among message blocks, see the section Message Passing in Dataflow (Task Parallel Library).

// Post more messages to the block asynchronously. 
for (int i = 0; i < 3; i++)
{
   await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block. 
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(await bufferBlock.ReceiveAsync());
}

/* Output:
   0
   1
   2
 */

The following example shows the complete code for this document.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block. 
class DataflowReadWrite
{
   // Demonstrates asynchronous dataflow operations. 
   static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
   {
      // Post more messages to the block asynchronously. 
      for (int i = 0; i < 3; i++)
      {
         await bufferBlock.SendAsync(i);
      }

      // Asynchronously receive the messages back from the block. 
      for (int i = 0; i < 3; i++)
      {
         Console.WriteLine(await bufferBlock.ReceiveAsync());
      }

      /* Output:
         0
         1
         2
       */
   }

   static void Main(string[] args)
   {
      // Create a BufferBlock<int> object. 
      var bufferBlock = new BufferBlock<int>();

      // Post several messages to the block. 
      for (int i = 0; i < 3; i++)
      {
         bufferBlock.Post(i);
      }

      // Receive the messages back from the block. 
      for (int i = 0; i < 3; i++)
      {
         Console.WriteLine(bufferBlock.Receive());
      }

      /* Output:
         0
         1
         2
       */ 

      // Post more messages to the block. 
      for (int i = 0; i < 3; i++)
      {
         bufferBlock.Post(i);
      }

      // Receive the messages back from the block. 
      int value;
      while (bufferBlock.TryReceive(out value))
      {
         Console.WriteLine(value);
      }

      /* Output:
         0
         1
         2
       */ 

      // Write to and read from the message block concurrently. 
      var post01 = Task.Run(() =>
         {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
         });
      var receive = Task.Run(() => 
         {
            for (int i = 0; i < 3; i++)
            {
               Console.WriteLine(bufferBlock.Receive());
            }
         });
      var post2 = Task.Run(() => 
         {
            bufferBlock.Post(2);
         });
      Task.WaitAll(post01, receive, post2);

      /* Sample output:
         2
         0
         1
       */ 

      // Demonstrate asynchronous dataflow operations.
      AsyncSendReceive(bufferBlock).Wait();
   }
}

Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named DataflowReadWrite.cs (DataflowReadWrite.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReadWrite.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReadWrite.vb

This example shows how to read from and write to a message block directly. You can also connect dataflow blocks to form pipelines, which are linear sequences of dataflow blocks, or networks, which are graphs of dataflow blocks. In a pipeline or network, sources asynchronously propagate data to targets as that data becomes available. For an example that creates a basic dataflow pipeline, see Walkthrough: Creating a Dataflow Pipeline. For an example that creates a more complex dataflow network, see Walkthrough: Using Dataflow in a Windows Forms Application.

Did you find this helpful?
(1500 characters remaining)
Thank you for your feedback
Show:
© 2014 Microsoft. All rights reserved.