Export (0) Print
Expand All

Walkthrough: Creating a Dataflow Pipeline

.NET Framework 4.5

Although you can use the DataflowBlock.Receive, DataflowBlock.ReceiveAsync, and DataflowBlock.TryReceive(Of TOutput) methods to receive messages from source blocks, you can also connect message blocks to form a dataflow pipeline. A dataflow pipeline is a series of components, or dataflow blocks, each of which performs a specific task that contributes to a larger goal. Every dataflow block in a dataflow pipeline performs work when it receives a message from another dataflow block. An analogy to this is an assembly line for automobile manufacturing. As each vehicle passes through the assembly line, one station assembles the frame, the next one installs the engine, and so on. Because an assembly line enables multiple vehicles to be assembled at the same time, it provides better throughput than assembling complete vehicles one at a time.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

This document demonstrates a dataflow pipeline that downloads the book The Iliad of Homer from a website and creates palindromes from the words that appear in that book. The formation of the dataflow pipeline in this document consists of the following steps:

  1. Create the dataflow blocks that participate in the pipeline.

  2. Connect each dataflow block to the next block in the pipeline. Each block receives as input the output of the previous block in the pipeline.

  3. For each dataflow block, create a continuation task that sets the next block to the completed state after the previous block finishes.

  4. Post data to the head of the pipeline.

  5. Mark the head of the pipeline as completed.

  6. Wait for the pipeline to complete all work.

Read Dataflow (Task Parallel Library) before you start this walkthrough.

In Visual Studio, create a Visual C# or Visual Basic Console Application project. Add a reference to System.Threading.Tasks.Dataflow.dll.

Alternatively, create a file and name it DataflowPalindromes.cs (DataflowPalindromes.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window to compile the project.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowPalindromes.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowPalindromes.vb

Add the following code to your project to create the basic application.

Imports System.Collections.Concurrent
Imports System.Linq
Imports System.Net
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline. 
' This program downloads the book "The Iliad of Homer" by Homer from the Web  
' and finds all palindromes that appear in that book. 
Class Program
	Private Shared Sub Main(args As String())
	End Sub 
End Class

Add the following code to the Main method to create the dataflow blocks that participate in the pipeline. The table that follows summarizes the role of each member of the pipeline.


' Create the members of the pipeline. 
'  

' Downloads the requested resource as a string. 
Dim downloadString = New TransformBlock(Of String, String)(Function(uri)
    Console.WriteLine("Downloading '{0}'...", uri)
    Return New WebClient().DownloadString(uri)
End Function)

' Separates the specified text into an array of words. 
Dim createWordList = New TransformBlock(Of String, String())(Function(text)
   ' Remove common punctuation by replacing all non-letter characters  
   ' with a space character to. 
   ' Separate the text into an array of words.
    Console.WriteLine("Creating word list...")
    Dim tokens() As Char = text.ToArray()
    For i As Integer = 0 To tokens.Length - 1
        If Not Char.IsLetter(tokens(i)) Then
            tokens(i) = " "c
        End If 
    Next i
    text = New String(tokens)
    Return text.Split(New Char() { " "c }, StringSplitOptions.RemoveEmptyEntries)
End Function)

' Removes short words, orders the resulting words alphabetically,  
' and then remove duplicates. 
Dim filterWordList = New TransformBlock(Of String() , String())(Function(words)
    Console.WriteLine("Filtering word list...")
    Return words.Where(Function(word) word.Length > 3).OrderBy(Function(word) word).Distinct().ToArray()
End Function)

' Finds all words in the specified collection whose reverse also  
' exists in the collection. 
Dim findPalindromes = New TransformManyBlock(Of String() , String)(Function(words)
   ' Holds palindromes. 
   ' Add each word in the original collection to the result whose  
   ' palindrome also exists in the collection. 
      ' Reverse the work. 
      ' Enqueue the word if the reversed version also exists 
      ' in the collection.
    Console.WriteLine("Finding palindromes...")
    Dim palindromes = New ConcurrentQueue(Of String)()
    Parallel.ForEach(words, Sub(word)
        Dim reverse As New String(word.Reverse().ToArray())
        If Array.BinarySearch(Of String)(words, reverse) >= 0 AndAlso word <> reverse Then
            palindromes.Enqueue(word)
        End If 
    End Sub)
    Return palindromes
End Function)

' Prints the provided palindrome to the console.     
Dim printPalindrome = New ActionBlock(Of String)(Sub(palindrome) Console.WriteLine("Found palindrome {0}/{1}", palindrome, New String(palindrome.Reverse().ToArray())))

Member

Type

Description

downloadString

TransformBlock(Of TInput, TOutput)

Downloads the book text from the Web.

createWordList

TransformBlock(Of TInput, TOutput)

Separates the book text into an array of words.

filterWordList

TransformBlock(Of TInput, TOutput)

Removes short words from the word array, orders the resulting words alphabetically, and remove duplicates.

findPalindromes

TransformManyBlock(Of TInput, TOutput)

Finds all words in the filtered word array collection whose reverse also occurs in the word array.

printPalindrome

ActionBlock(Of TInput)

Prints palindromes to the console.

Although you could combine multiple steps in the dataflow pipeline in this example into one step, the example illustrates the concept of composing multiple independent dataflow tasks to perform a larger task. The example uses TransformBlock(Of TInput, TOutput) to enable each member of the pipeline to perform an operation on its input data and send the results to the next step in the pipeline. The findPalindromes member of the pipeline is a TransformManyBlock(Of TInput, TOutput) object because it produces multiple independent outputs for each input. The tail of the pipeline, printPalindrome, is a ActionBlock(Of TInput) object because it performs an action on its input, and does not produce a result.

Add the following code to connect each block to the next block in the pipeline.

When you call the LinkTo method to connect a source dataflow block to a target dataflow block, the source dataflow block propagates data to the target block as data becomes available.


' Connect the dataflow blocks to form a pipeline. 
'

downloadString.LinkTo(createWordList)
createWordList.LinkTo(filterWordList)
filterWordList.LinkTo(findPalindromes)
findPalindromes.LinkTo(printPalindrome)

Add the following code to enable each dataflow block to perform a final action after it processes all data.

      
      ' For each completion task in the pipeline, create a continuation task 
      ' that marks the next block in the pipeline as completed. 
      ' A completed dataflow block processes any buffered elements, but does 
      ' not accept new elements. 
      '

      downloadString.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(createWordList, IDataflowBlock).Fault(t.Exception)
              Else
                  createWordList.Complete()
              End If 
      End Sub)
      createWordList.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(filterWordList, IDataflowBlock).Fault(t.Exception)
              Else
                  filterWordList.Complete()
              End If 
      End Sub)
      filterWordList.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(findPalindromes, IDataflowBlock).Fault(t.Exception)
              Else
                  findPalindromes.Complete()
              End If 
      End Sub)
      findPalindromes.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(printPalindrome, IDataflowBlock).Fault(t.Exception)
              Else
                  printPalindrome.Complete()
              End If 
      End Sub)

To propagate completion through the pipeline, each completion task sets the next dataflow block to the completed state. For example, when the head of the pipeline is set to the completed state, it processes any remaining buffered messages and then runs its completion task, which sets the second member of the pipeline to the completed state. The second member of the pipeline in turn processes any remaining buffered messages and then runs its completion task, which sets the third member of the pipeline to the completed state. This process continues until all members of the pipeline finish. This example uses the delegate keyword (Function in Visual Basic) to define the continuation tasks.

Add the following code to post the URL of the book The Iliad of Homer to the head of the dataflow pipeline.

' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt")

This example uses DataflowBlock.Post(Of TInput) to synchronously send data to the head of the pipeline. Use the DataflowBlock.SendAsync method when you must asynchronously send data to a dataflow node.

Add the following code to mark the head of the pipeline as completed. The head of the pipeline runs its continuation task after it processes all buffered messages. This continuation task propagates the completed state through the pipeline.

' Mark the head of the pipeline as complete. The continuation tasks  
' propagate completion through the pipeline as each part of the  
' pipeline finishes.
downloadString.Complete()

This example sends one URL through the dataflow pipeline to be processed. If you send more than one input through a pipeline, call the IDataflowBlock.Complete method after you submit all the input. You can omit this step if your application has no well-defined point at which data is no longer available or the application does not have to wait for the pipeline to finish.

Add the following code to wait for the pipeline to finish. Because this example uses continuation tasks to propagate completion through the pipeline, the overall operation is finished when the tail of the pipeline finishes.

' Wait for the last block in the pipeline to process all messages.
printPalindrome.Completion.Wait()

You can wait for dataflow completion from any thread or from multiple threads at the same time.

The following example shows the complete code for this walkthrough.

Imports System
Imports System.Collections.Concurrent
Imports System.Linq
Imports System.Net
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline. 
' This program downloads the book "The Iliad of Homer" by Homer from the Web  
' and finds all palindromes that appear in that book. 
Friend Class DataflowPalindromes

   Shared Sub Main(ByVal args() As String)
      
      ' Create the members of the pipeline. 
      '  

      ' Downloads the requested resource as a string. 
      Dim downloadString = New TransformBlock(Of String, String)(Function(uri)
          Console.WriteLine("Downloading '{0}'...", uri)
          Return New WebClient().DownloadString(uri)
      End Function)

      ' Separates the specified text into an array of words. 
      Dim createWordList = New TransformBlock(Of String, String())(Function(text)
         ' Remove common punctuation by replacing all non-letter characters  
         ' with a space character to. 
         ' Separate the text into an array of words.
          Console.WriteLine("Creating word list...")
          Dim tokens() As Char = text.ToArray()
          For i As Integer = 0 To tokens.Length - 1
              If Not Char.IsLetter(tokens(i)) Then
                  tokens(i) = " "c
              End If 
          Next i
          text = New String(tokens)
          Return text.Split(New Char() { " "c }, StringSplitOptions.RemoveEmptyEntries)
      End Function)

      ' Removes short words, orders the resulting words alphabetically,  
      ' and then remove duplicates. 
      Dim filterWordList = New TransformBlock(Of String() , String())(Function(words)
          Console.WriteLine("Filtering word list...")
          Return words.Where(Function(word) word.Length > 3).OrderBy(Function(word) word).Distinct().ToArray()
      End Function)

      ' Finds all words in the specified collection whose reverse also  
      ' exists in the collection. 
      Dim findPalindromes = New TransformManyBlock(Of String() , String)(Function(words)
         ' Holds palindromes. 
         ' Add each word in the original collection to the result whose  
         ' palindrome also exists in the collection. 
            ' Reverse the work. 
            ' Enqueue the word if the reversed version also exists 
            ' in the collection.
          Console.WriteLine("Finding palindromes...")
          Dim palindromes = New ConcurrentQueue(Of String)()
          Parallel.ForEach(words, Sub(word)
              Dim reverse As New String(word.Reverse().ToArray())
              If Array.BinarySearch(Of String)(words, reverse) >= 0 AndAlso word <> reverse Then
                  palindromes.Enqueue(word)
              End If 
          End Sub)
          Return palindromes
      End Function)

      ' Prints the provided palindrome to the console.     
      Dim printPalindrome = New ActionBlock(Of String)(Sub(palindrome) Console.WriteLine("Found palindrome {0}/{1}", palindrome, New String(palindrome.Reverse().ToArray())))

      
      ' Connect the dataflow blocks to form a pipeline. 
      '

      downloadString.LinkTo(createWordList)
      createWordList.LinkTo(filterWordList)
      filterWordList.LinkTo(findPalindromes)
      findPalindromes.LinkTo(printPalindrome)
      
      ' For each completion task in the pipeline, create a continuation task 
      ' that marks the next block in the pipeline as completed. 
      ' A completed dataflow block processes any buffered elements, but does 
      ' not accept new elements. 
      '

      downloadString.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(createWordList, IDataflowBlock).Fault(t.Exception)
              Else
                  createWordList.Complete()
              End If 
      End Sub)
      createWordList.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(filterWordList, IDataflowBlock).Fault(t.Exception)
              Else
                  filterWordList.Complete()
              End If 
      End Sub)
      filterWordList.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(findPalindromes, IDataflowBlock).Fault(t.Exception)
              Else
                  findPalindromes.Complete()
              End If 
      End Sub)
      findPalindromes.Completion.ContinueWith(Sub(t)
          If t.IsFaulted Then 
              CType(printPalindrome, IDataflowBlock).Fault(t.Exception)
              Else
                  printPalindrome.Complete()
              End If 
      End Sub)


      ' Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt")

      ' Mark the head of the pipeline as complete. The continuation tasks  
      ' propagate completion through the pipeline as each part of the  
      ' pipeline finishes.
      downloadString.Complete()

      ' Wait for the last block in the pipeline to process all messages.
      printPalindrome.Completion.Wait()

   End Sub 

End Class 

' Sample output: 
'Downloading 'http://www.gutenberg.org/files/6130/6130-0.txt'... 
'Creating word list... 
'Filtering word list... 
'Finding palindromes... 
'Found palindrome doom/mood 
'Found palindrome draw/ward 
'Found palindrome live/evil 
'Found palindrome seat/taes 
'Found palindrome aera/area 
'Found palindrome mood/doom 
'Found palindrome moor/room 
'Found palindrome sleek/keels 
'Found palindrome area/aera 
'Found palindrome evil/live 
'Found palindrome speed/deeps 
'Found palindrome spot/tops 
'Found palindrome spots/stops 
'Found palindrome stops/spots 
'Found palindrome taes/seat 
'Found palindrome port/trop 
'Found palindrome tops/spot 
'Found palindrome trop/port 
'Found palindrome reed/deer 
'Found palindrome deeps/speed 
'Found palindrome deer/reed 
'Found palindrome ward/draw 
'Found palindrome room/moor 
'Found palindrome keels/sleek 
'

This example sends one URL to process through the dataflow pipeline. If you send more than one input value through a pipeline, you can introduce a form of parallelism into your application that resembles how parts might move through an automobile factory. When the first member of the pipeline sends its result to the second member, it can process another item in parallel as the second member processes the first result.

The parallelism that is achieved by using dataflow pipelines is known as coarse-grained parallelism because it typically consists of fewer, larger tasks. You can also use a more fine-grained parallelism of smaller, short-running tasks in a dataflow pipeline. In this example, the findPalindromes member of the pipeline uses the Parallel.ForEach method to process multiple items in the work list in parallel. The use of fine-grained parallelism in a coarse-grained pipeline can improve overall throughput.

You can also connect a source dataflow block to multiple target blocks to create a dataflow network. The overloaded version of the LinkTo method takes a Predicate(Of T) object that defines whether the target block accepts each message based on its value. Most dataflow block types that act as sources offer messages to all connected target blocks, in the order in which they were connected, until one of the blocks accepts that message. By using this filtering mechanism, you can create systems of connected dataflow blocks that direct certain data through one path and other data through another path. For an example that uses filtering to create a dataflow network, see Walkthrough: Using Dataflow in a Windows Forms Application.

Show:
© 2014 Microsoft