Export (0) Print
Expand All

How to: Write Messages to and Read Messages from a Dataflow Block

.NET Framework 4.5

This document describes how to use the TPL Dataflow Library to write messages to and read messages from a dataflow block. The TPL Dataflow Library provides both synchronous and asynchronous methods for writing messages to and reading messages from a dataflow block. This document uses the System.Threading.Tasks.Dataflow.BufferBlock(Of T) class. The BufferBlock(Of T) class buffers messages and behaves as both a message source and as a message target.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

The following example uses the Post(Of TInput) method to write to a BufferBlock(Of T) dataflow block and the Receive method to read from the same object.

      ' Create a BufferBlock<int> object. 
      Dim bufferBlock = New BufferBlock(Of Integer)()

      ' Post several messages to the block. 
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block. 
      For i As Integer = 0 To 2
         Console.WriteLine(bufferBlock.Receive())
      Next i

'       Output: 
'         0 
'         1 
'         2 
'       

You can also use the TryReceive method to read from a dataflow block, as shown in the following example. The TryReceive method does not block the current thread and is useful when you occasionally poll for data.

      ' Post more messages to the block. 
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block. 
      Dim value As Integer 
      Do While bufferBlock.TryReceive(value)
         Console.WriteLine(value)
      Loop 

'       Output: 
'         0 
'         1 
'         2 
'       

Because the Post(Of TInput) method acts synchronously, the BufferBlock(Of T) object in the previous examples receives all data before the second loop reads data. The following example extends the first example by using Invoke to read from and write to the message block concurrently. Because Invoke performs actions concurrently, the values are not written to the BufferBlock(Of T) object in any specific order.

      ' Write to and read from the message block concurrently. 
      Dim post01 = Task.Run(Sub()
          bufferBlock.Post(0)
          bufferBlock.Post(1)
      End Sub)
      Dim receive = Task.Run(Sub()
            For i As Integer = 0 To 2
               Console.WriteLine(bufferBlock.Receive())
            Next i
      End Sub)
      Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
      Task.WaitAll(post01, receive, post2)

'       Sample output: 
'         2 
'         0 
'         1 
'       

The following example uses the SendAsync method to asynchronously write to a BufferBlock(Of T) object and the ReceiveAsync method to asynchronously read from the same object. This example uses the async and await operators (Async and Await in Visual Basic) to asynchronously send data to and read data from the target block. The SendAsync method is useful when you must enable a dataflow block to postpone messages. The ReceiveAsync method is useful when you want to act on data when that data becomes available. For more information about how messages propagate among message blocks, see the section Message Passing in Dataflow (Task Parallel Library).

      ' Post more messages to the block asynchronously. 
      For i As Integer = 0 To 2
         await bufferBlock.SendAsync(i)
      Next i

      ' Asynchronously receive the messages back from the block. 
      For i As Integer = 0 To 2
         Console.WriteLine(await bufferBlock.ReceiveAsync())
      Next i

'       Output: 
'         0 
'         1 
'         2 
'       

The following example shows the complete code for this document.

Imports System
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block. 
Friend Class DataflowReadWrite
   ' Demonstrates asynchronous dataflow operations. 
   Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
      ' Post more messages to the block asynchronously. 
      For i As Integer = 0 To 2
         await bufferBlock.SendAsync(i)
      Next i

      ' Asynchronously receive the messages back from the block. 
      For i As Integer = 0 To 2
         Console.WriteLine(await bufferBlock.ReceiveAsync())
      Next i

'       Output: 
'         0 
'         1 
'         2 
'        
   End Function 

   Shared Sub Main(ByVal args() As String)
      ' Create a BufferBlock<int> object. 
      Dim bufferBlock = New BufferBlock(Of Integer)()

      ' Post several messages to the block. 
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block. 
      For i As Integer = 0 To 2
         Console.WriteLine(bufferBlock.Receive())
      Next i

'       Output: 
'         0 
'         1 
'         2 
'        

      ' Post more messages to the block. 
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block. 
      Dim value As Integer 
      Do While bufferBlock.TryReceive(value)
         Console.WriteLine(value)
      Loop 

'       Output: 
'         0 
'         1 
'         2 
'        

      ' Write to and read from the message block concurrently. 
      Dim post01 = Task.Run(Sub()
          bufferBlock.Post(0)
          bufferBlock.Post(1)
      End Sub)
      Dim receive = Task.Run(Sub()
            For i As Integer = 0 To 2
               Console.WriteLine(bufferBlock.Receive())
            Next i
      End Sub)
      Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
      Task.WaitAll(post01, receive, post2)

'       Sample output: 
'         2 
'         0 
'         1 
'        

      ' Demonstrate asynchronous dataflow operations.
      AsyncSendReceive(bufferBlock).Wait()
   End Sub 

End Class

Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named DataflowReadWrite.cs (DataflowReadWrite.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReadWrite.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReadWrite.vb

This example shows how to read from and write to a message block directly. You can also connect dataflow blocks to form pipelines, which are linear sequences of dataflow blocks, or networks, which are graphs of dataflow blocks. In a pipeline or network, sources asynchronously propagate data to targets as that data becomes available. For an example that creates a basic dataflow pipeline, see Walkthrough: Creating a Dataflow Pipeline. For an example that creates a more complex dataflow network, see Walkthrough: Using Dataflow in a Windows Forms Application.

Show:
© 2014 Microsoft