Was this page helpful?
Your feedback about this content is important. Let us know what you think.
Additional feedback?
1500 characters remaining
Export (0) Print
Expand All

Walkthrough: Creating a Dataflow Pipeline

.NET Framework 4.6 and 4.5

Although you can use the DataflowBlock.Receive, DataflowBlock.ReceiveAsync, and DataflowBlock.TryReceive<TOutput> methods to receive messages from source blocks, you can also connect message blocks to form a dataflow pipeline. A dataflow pipeline is a series of components, or dataflow blocks, each of which performs a specific task that contributes to a larger goal. Every dataflow block in a dataflow pipeline performs work when it receives a message from another dataflow block. An analogy to this is an assembly line for automobile manufacturing. As each vehicle passes through the assembly line, one station assembles the frame, the next one installs the engine, and so on. Because an assembly line enables multiple vehicles to be assembled at the same time, it provides better throughput than assembling complete vehicles one at a time.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

This document demonstrates a dataflow pipeline that downloads the book The Iliad of Homer from a website and searches the text to match individual words with words that reverse the first word's characters. The formation of the dataflow pipeline in this document consists of the following steps:

  1. Create the dataflow blocks that participate in the pipeline.

  2. Connect each dataflow block to the next block in the pipeline. Each block receives as input the output of the previous block in the pipeline.

  3. For each dataflow block, create a continuation task that sets the next block to the completed state after the previous block finishes.

  4. Post data to the head of the pipeline.

  5. Mark the head of the pipeline as completed.

  6. Wait for the pipeline to complete all work.

Read Dataflow (Task Parallel Library) before you start this walkthrough.

In Visual Studio, create a Visual C# or Visual Basic Console Application project. Add a reference to System.Threading.Tasks.Dataflow.dll.

Alternatively, create a file and name it ReverseWords.cs (ReverseWords.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window to compile the project.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll ReverseWords.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll ReverseWords.vb

Add the following code to your project to create the basic application.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline. 
// This program downloads the book "The Iliad of Homer" by Homer from the Web 
// and finds all reversed words that appear in that book. 
class Program
{
   static void Main(string[] args)
   {
   }
}

Add the following code to the Main method to create the dataflow blocks that participate in the pipeline. The table that follows summarizes the role of each member of the pipeline.

// 
// Create the members of the pipeline. 
//  

// Downloads the requested resource as a string. 
var downloadString = new TransformBlock<string, string>(uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return new WebClient().DownloadString(uri);
});

// Separates the specified text into an array of words. 
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters  
   // with a space character to. 
   char[] tokens = text.ToArray();
   for (int i = 0; i < tokens.Length; i++)
   {
      if (!char.IsLetter(tokens[i]))
         tokens[i] = ' ';
   }
   text = new string(tokens);

   // Separate the text into an array of words. 
   return text.Split(new char[] { ' ' },
      StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words, orders the resulting words alphabetically,  
// and then remove duplicates. 
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words.Where(word => word.Length > 3).OrderBy(word => word)
      .Distinct().ToArray();
});

// Finds all words in the specified collection whose reverse also  
// exists in the collection. 
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   // Holds reversed words. 
   var reversedWords = new ConcurrentQueue<string>();

   // Add each word in the original collection to the result whose  
   // reversed word also exists in the collection.
   Parallel.ForEach(words, word =>
   {
      // Reverse the work. 
      string reverse = new string(word.Reverse().ToArray());

      // Enqueue the word if the reversed version also exists 
      // in the collection. 
      if (Array.BinarySearch<string>(words, reverse) >= 0 &&
          word != reverse)
      {
         reversedWords.Enqueue(word);
      }
   });

   return reversedWords;
});

// Prints the provided reversed words to the console.     
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});

Member

Type

Description

downloadString

TransformBlock<TInput, TOutput>

Downloads the book text from the Web.

createWordList

TransformBlock<TInput, TOutput>

Separates the book text into an array of words.

filterWordList

TransformBlock<TInput, TOutput>

Removes short words from the word array, orders the resulting words alphabetically, and remove duplicates.

findReversedWords

TransformManyBlock<TInput, TOutput>

Finds all words in the filtered word array collection whose reverse also occurs in the word array.

printReversedWords

ActionBlock<TInput>

Displays words and the corresponding reverse words to the console.

Although you could combine multiple steps in the dataflow pipeline in this example into one step, the example illustrates the concept of composing multiple independent dataflow tasks to perform a larger task. The example uses TransformBlock<TInput, TOutput> to enable each member of the pipeline to perform an operation on its input data and send the results to the next step in the pipeline. The findReversedWords member of the pipeline is a TransformManyBlock<TInput, TOutput> object because it produces multiple independent outputs for each input. The tail of the pipeline, printReversedWords, is a ActionBlock<TInput> object because it performs an action on its input, and does not produce a result.

Add the following code to connect each block to the next block in the pipeline.

When you call the LinkTo method to connect a source dataflow block to a target dataflow block, the source dataflow block propagates data to the target block as data becomes available.

// 
// Connect the dataflow blocks to form a pipeline. 
//

downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findReversedWords);
findReversedWords.LinkTo(printReversedWords);

Add the following code to enable each dataflow block to perform a final action after it processes all data.

// 
// For each completion task in the pipeline, create a continuation task 
// that marks the next block in the pipeline as completed. 
// A completed dataflow block processes any buffered elements, but does 
// not accept new elements. 
//

downloadString.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)createWordList).Fault(t.Exception);
   else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)filterWordList).Fault(t.Exception);
   else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)findReversedWords).Fault(t.Exception);
   else findReversedWords.Complete();
});
findReversedWords.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)printReversedWords).Fault(t.Exception);
   else printReversedWords.Complete();
});

To propagate completion through the pipeline, each completion task sets the next dataflow block to the completed state. For example, when the head of the pipeline is set to the completed state, it processes any remaining buffered messages and then runs its completion task, which sets the second member of the pipeline to the completed state. The second member of the pipeline in turn processes any remaining buffered messages and then runs its completion task, which sets the third member of the pipeline to the completed state. This process continues until all members of the pipeline finish. This example uses the delegate keyword (Function in Visual Basic) to define the continuation tasks.

Add the following code to post the URL of the book The Iliad of Homer to the head of the dataflow pipeline.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt");

This example uses DataflowBlock.Post<TInput> to synchronously send data to the head of the pipeline. Use the DataflowBlock.SendAsync method when you must asynchronously send data to a dataflow node.

Add the following code to mark the head of the pipeline as completed. The head of the pipeline runs its continuation task after it processes all buffered messages. This continuation task propagates the completed state through the pipeline.

// Mark the head of the pipeline as complete. The continuation tasks  
// propagate completion through the pipeline as each part of the  
// pipeline finishes.
downloadString.Complete();

This example sends one URL through the dataflow pipeline to be processed. If you send more than one input through a pipeline, call the IDataflowBlock.Complete method after you submit all the input. You can omit this step if your application has no well-defined point at which data is no longer available or the application does not have to wait for the pipeline to finish.

Add the following code to wait for the pipeline to finish. Because this example uses continuation tasks to propagate completion through the pipeline, the overall operation is finished when the tail of the pipeline finishes.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();

You can wait for dataflow completion from any thread or from multiple threads at the same time.

The following example shows the complete code for this walkthrough.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline. 
// This program downloads the book "The Iliad of Homer" by Homer from the Web 
// and finds all reversed words that appear in that book. 
class DataflowReversedWords
{
   static void Main(string[] args)
   {
      // 
      // Create the members of the pipeline. 
      //  

      // Downloads the requested resource as a string. 
      var downloadString = new TransformBlock<string, string>(uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return new WebClient().DownloadString(uri);
      });

      // Separates the specified text into an array of words. 
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters  
         // with a space character to. 
         char[] tokens = text.ToArray();
         for (int i = 0; i < tokens.Length; i++)
         {
            if (!char.IsLetter(tokens[i]))
               tokens[i] = ' ';
         }
         text = new string(tokens);

         // Separate the text into an array of words. 
         return text.Split(new char[] { ' ' },
            StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words, orders the resulting words alphabetically,  
      // and then remove duplicates. 
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words.Where(word => word.Length > 3).OrderBy(word => word)
            .Distinct().ToArray();
      });

      // Finds all words in the specified collection whose reverse also  
      // exists in the collection. 
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         // Holds reversed words. 
         var reversedWords = new ConcurrentQueue<string>();

         // Add each word in the original collection to the result whose  
         // reversed word also exists in the collection.
         Parallel.ForEach(words, word =>
         {
            // Reverse the work. 
            string reverse = new string(word.Reverse().ToArray());

            // Enqueue the word if the reversed version also exists 
            // in the collection. 
            if (Array.BinarySearch<string>(words, reverse) >= 0 &&
                word != reverse)
            {
               reversedWords.Enqueue(word);
            }
         });

         return reversedWords;
      });

      // Prints the provided reversed words to the console.     
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      // 
      // Connect the dataflow blocks to form a pipeline. 
      //

      downloadString.LinkTo(createWordList);
      createWordList.LinkTo(filterWordList);
      filterWordList.LinkTo(findReversedWords);
      findReversedWords.LinkTo(printReversedWords);

      // 
      // For each completion task in the pipeline, create a continuation task 
      // that marks the next block in the pipeline as completed. 
      // A completed dataflow block processes any buffered elements, but does 
      // not accept new elements. 
      //

      downloadString.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)createWordList).Fault(t.Exception);
         else createWordList.Complete();
      });
      createWordList.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)filterWordList).Fault(t.Exception);
         else filterWordList.Complete();
      });
      filterWordList.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)findReversedWords).Fault(t.Exception);
         else findReversedWords.Complete();
      });
      findReversedWords.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)printReversedWords).Fault(t.Exception);
         else printReversedWords.Complete();
      });

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt");

      // Mark the head of the pipeline as complete. The continuation tasks  
      // propagate completion through the pipeline as each part of the  
      // pipeline finishes.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/files/6130/6130-0.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/

This example sends one URL to process through the dataflow pipeline. If you send more than one input value through a pipeline, you can introduce a form of parallelism into your application that resembles how parts might move through an automobile factory. When the first member of the pipeline sends its result to the second member, it can process another item in parallel as the second member processes the first result.

The parallelism that is achieved by using dataflow pipelines is known as coarse-grained parallelism because it typically consists of fewer, larger tasks. You can also use a more fine-grained parallelism of smaller, short-running tasks in a dataflow pipeline. In this example, the findReversedWords member of the pipeline uses the Parallel.ForEach method to process multiple items in the work list in parallel. The use of fine-grained parallelism in a coarse-grained pipeline can improve overall throughput.

You can also connect a source dataflow block to multiple target blocks to create a dataflow network. The overloaded version of the LinkTo method takes a Predicate<T> object that defines whether the target block accepts each message based on its value. Most dataflow block types that act as sources offer messages to all connected target blocks, in the order in which they were connected, until one of the blocks accepts that message. By using this filtering mechanism, you can create systems of connected dataflow blocks that direct certain data through one path and other data through another path. For an example that uses filtering to create a dataflow network, see Walkthrough: Using Dataflow in a Windows Forms Application.

Show:
© 2015 Microsoft