Export (0) Print
Expand All

Walkthrough: Creating a Dataflow Pipeline

.NET Framework 4.5

Although you can use the DataflowBlock.Receive, DataflowBlock.ReceiveAsync, and DataflowBlock.TryReceive<TOutput> methods to receive messages from source blocks, you can also connect message blocks to form a dataflow pipeline. A dataflow pipeline is a series of components, or dataflow blocks, each of which performs a specific task that contributes to a larger goal. Every dataflow block in a dataflow pipeline performs work when it receives a message from another dataflow block. An analogy to this is an assembly line for automobile manufacturing. As each vehicle passes through the assembly line, one station assembles the frame, the next one installs the engine, and so on. Because an assembly line enables multiple vehicles to be assembled at the same time, it provides better throughput than assembling complete vehicles one at a time.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

This document demonstrates a dataflow pipeline that downloads the book The Iliad of Homer from a website and creates palindromes from the words that appear in that book. The formation of the dataflow pipeline in this document consists of the following steps:

  1. Create the dataflow blocks that participate in the pipeline.

  2. Connect each dataflow block to the next block in the pipeline. Each block receives as input the output of the previous block in the pipeline.

  3. For each dataflow block, create a continuation task that sets the next block to the completed state after the previous block finishes.

  4. Post data to the head of the pipeline.

  5. Mark the head of the pipeline as completed.

  6. Wait for the pipeline to complete all work.

Read Dataflow (Task Parallel Library) before you start this walkthrough.

In Visual Studio, create a Visual C# or Visual Basic Console Application project. Add a reference to System.Threading.Tasks.Dataflow.dll.

Alternatively, create a file and name it DataflowPalindromes.cs (DataflowPalindromes.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window to compile the project.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowPalindromes.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowPalindromes.vb

Add the following code to your project to create the basic application.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline. 
// This program downloads the book "The Iliad of Homer" by Homer from the Web 
// and finds all palindromes that appear in that book. 
class Program
{
   static void Main(string[] args)
   {
   }
}

Add the following code to the Main method to create the dataflow blocks that participate in the pipeline. The table that follows summarizes the role of each member of the pipeline.

// 
// Create the members of the pipeline. 
//  

// Downloads the requested resource as a string. 
var downloadString = new TransformBlock<string, string>(uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return new WebClient().DownloadString(uri);
});

// Separates the specified text into an array of words. 
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters  
   // with a space character to. 
   char[] tokens = text.ToArray();
   for(int i=0; i<tokens.Length; i++)
   {
      if (!char.IsLetter(tokens[i]))
         tokens[i] = ' ';
   }
   text = new string(tokens);

   // Separate the text into an array of words. 
   return text.Split(new char[] { ' ' },
      StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words, orders the resulting words alphabetically,  
// and then remove duplicates. 
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words.Where(word => word.Length > 3).OrderBy(word => word)
      .Distinct().ToArray();
});

// Finds all words in the specified collection whose reverse also  
// exists in the collection. 
var findPalindromes = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding palindromes...");

   // Holds palindromes. 
   var palindromes = new ConcurrentQueue<string>();

   // Add each word in the original collection to the result whose  
   // palindrome also exists in the collection.
   Parallel.ForEach(words, word =>
   {
      // Reverse the work. 
      string reverse = new string(word.Reverse().ToArray());

      // Enqueue the word if the reversed version also exists 
      // in the collection. 
      if (Array.BinarySearch<string>(words, reverse) >= 0 &&
          word != reverse)
      {
         palindromes.Enqueue(word);
      }
   });

   return palindromes;
});

// Prints the provided palindrome to the console.     
var printPalindrome = new ActionBlock<string>(palindrome =>
{
   Console.WriteLine("Found palindrome {0}/{1}",
      palindrome, new string(palindrome.Reverse().ToArray()));
});

Member

Type

Description

downloadString

TransformBlock<TInput, TOutput>

Downloads the book text from the Web.

createWordList

TransformBlock<TInput, TOutput>

Separates the book text into an array of words.

filterWordList

TransformBlock<TInput, TOutput>

Removes short words from the word array, orders the resulting words alphabetically, and remove duplicates.

findPalindromes

TransformManyBlock<TInput, TOutput>

Finds all words in the filtered word array collection whose reverse also occurs in the word array.

printPalindrome

ActionBlock<TInput>

Prints palindromes to the console.

Although you could combine multiple steps in the dataflow pipeline in this example into one step, the example illustrates the concept of composing multiple independent dataflow tasks to perform a larger task. The example uses TransformBlock<TInput, TOutput> to enable each member of the pipeline to perform an operation on its input data and send the results to the next step in the pipeline. The findPalindromes member of the pipeline is a TransformManyBlock<TInput, TOutput> object because it produces multiple independent outputs for each input. The tail of the pipeline, printPalindrome, is a ActionBlock<TInput> object because it performs an action on its input, and does not produce a result.

Add the following code to connect each block to the next block in the pipeline.

When you call the LinkTo method to connect a source dataflow block to a target dataflow block, the source dataflow block propagates data to the target block as data becomes available.

// 
// Connect the dataflow blocks to form a pipeline. 
//

downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findPalindromes);
findPalindromes.LinkTo(printPalindrome);

Add the following code to enable each dataflow block to perform a final action after it processes all data.

// 
// For each completion task in the pipeline, create a continuation task 
// that marks the next block in the pipeline as completed. 
// A completed dataflow block processes any buffered elements, but does 
// not accept new elements. 
//

downloadString.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)createWordList).Fault(t.Exception);
   else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)filterWordList).Fault(t.Exception);
   else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)findPalindromes).Fault(t.Exception);
   else findPalindromes.Complete();
});
findPalindromes.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)printPalindrome).Fault(t.Exception);
   else printPalindrome.Complete();
});

To propagate completion through the pipeline, each completion task sets the next dataflow block to the completed state. For example, when the head of the pipeline is set to the completed state, it processes any remaining buffered messages and then runs its completion task, which sets the second member of the pipeline to the completed state. The second member of the pipeline in turn processes any remaining buffered messages and then runs its completion task, which sets the third member of the pipeline to the completed state. This process continues until all members of the pipeline finish. This example uses the delegate keyword (Function in Visual Basic) to define the continuation tasks.

Add the following code to post the URL of the book The Iliad of Homer to the head of the dataflow pipeline.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt");

This example uses DataflowBlock.Post<TInput> to synchronously send data to the head of the pipeline. Use the DataflowBlock.SendAsync method when you must asynchronously send data to a dataflow node.

Add the following code to mark the head of the pipeline as completed. The head of the pipeline runs its continuation task after it processes all buffered messages. This continuation task propagates the completed state through the pipeline.

// Mark the head of the pipeline as complete. The continuation tasks  
// propagate completion through the pipeline as each part of the  
// pipeline finishes.
downloadString.Complete();

This example sends one URL through the dataflow pipeline to be processed. If you send more than one input through a pipeline, call the IDataflowBlock.Complete method after you submit all the input. You can omit this step if your application has no well-defined point at which data is no longer available or the application does not have to wait for the pipeline to finish.

Add the following code to wait for the pipeline to finish. Because this example uses continuation tasks to propagate completion through the pipeline, the overall operation is finished when the tail of the pipeline finishes.

// Wait for the last block in the pipeline to process all messages.
printPalindrome.Completion.Wait();

You can wait for dataflow completion from any thread or from multiple threads at the same time.

The following example shows the complete code for this walkthrough.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline. 
// This program downloads the book "The Iliad of Homer" by Homer from the Web 
// and finds all palindromes that appear in that book. 
class DataflowPalindromes
{
   static void Main(string[] args)
   {
      // 
      // Create the members of the pipeline. 
      //  

      // Downloads the requested resource as a string. 
      var downloadString = new TransformBlock<string, string>(uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return new WebClient().DownloadString(uri);
      });

      // Separates the specified text into an array of words. 
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters  
         // with a space character to. 
         char[] tokens = text.ToArray();
         for(int i=0; i<tokens.Length; i++)
         {
            if (!char.IsLetter(tokens[i]))
               tokens[i] = ' ';
         }
         text = new string(tokens);

         // Separate the text into an array of words. 
         return text.Split(new char[] { ' ' },
            StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words, orders the resulting words alphabetically,  
      // and then remove duplicates. 
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words.Where(word => word.Length > 3).OrderBy(word => word)
            .Distinct().ToArray();
      });

      // Finds all words in the specified collection whose reverse also  
      // exists in the collection. 
      var findPalindromes = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding palindromes...");

         // Holds palindromes. 
         var palindromes = new ConcurrentQueue<string>();

         // Add each word in the original collection to the result whose  
         // palindrome also exists in the collection.
         Parallel.ForEach(words, word =>
         {
            // Reverse the work. 
            string reverse = new string(word.Reverse().ToArray());

            // Enqueue the word if the reversed version also exists 
            // in the collection. 
            if (Array.BinarySearch<string>(words, reverse) >= 0 &&
                word != reverse)
            {
               palindromes.Enqueue(word);
            }
         });

         return palindromes;
      });

      // Prints the provided palindrome to the console.     
      var printPalindrome = new ActionBlock<string>(palindrome =>
      {
         Console.WriteLine("Found palindrome {0}/{1}",
            palindrome, new string(palindrome.Reverse().ToArray()));
      });

      // 
      // Connect the dataflow blocks to form a pipeline. 
      //

      downloadString.LinkTo(createWordList);
      createWordList.LinkTo(filterWordList);
      filterWordList.LinkTo(findPalindromes);
      findPalindromes.LinkTo(printPalindrome);

      // 
      // For each completion task in the pipeline, create a continuation task 
      // that marks the next block in the pipeline as completed. 
      // A completed dataflow block processes any buffered elements, but does 
      // not accept new elements. 
      //

      downloadString.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)createWordList).Fault(t.Exception);
         else createWordList.Complete();
      });
      createWordList.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)filterWordList).Fault(t.Exception);
         else filterWordList.Complete();
      });
      filterWordList.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)findPalindromes).Fault(t.Exception);
         else findPalindromes.Complete();
      });
      findPalindromes.Completion.ContinueWith(t =>
      {
         if (t.IsFaulted) ((IDataflowBlock)printPalindrome).Fault(t.Exception);
         else printPalindrome.Complete();
      });

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt");

      // Mark the head of the pipeline as complete. The continuation tasks  
      // propagate completion through the pipeline as each part of the  
      // pipeline finishes.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printPalindrome.Completion.Wait();
   }
}

/* Sample output:
Downloading 'http://www.gutenberg.org/files/6130/6130-0.txt'...
Creating word list...
Filtering word list...
Finding palindromes...
Found palindrome doom/mood
Found palindrome draw/ward
Found palindrome live/evil
Found palindrome seat/taes
Found palindrome aera/area
Found palindrome mood/doom
Found palindrome moor/room
Found palindrome sleek/keels
Found palindrome area/aera
Found palindrome evil/live
Found palindrome speed/deeps
Found palindrome spot/tops
Found palindrome spots/stops
Found palindrome stops/spots
Found palindrome taes/seat
Found palindrome port/trop
Found palindrome tops/spot
Found palindrome trop/port
Found palindrome reed/deer
Found palindrome deeps/speed
Found palindrome deer/reed
Found palindrome ward/draw
Found palindrome room/moor
Found palindrome keels/sleek
*/

This example sends one URL to process through the dataflow pipeline. If you send more than one input value through a pipeline, you can introduce a form of parallelism into your application that resembles how parts might move through an automobile factory. When the first member of the pipeline sends its result to the second member, it can process another item in parallel as the second member processes the first result.

The parallelism that is achieved by using dataflow pipelines is known as coarse-grained parallelism because it typically consists of fewer, larger tasks. You can also use a more fine-grained parallelism of smaller, short-running tasks in a dataflow pipeline. In this example, the findPalindromes member of the pipeline uses the Parallel.ForEach method to process multiple items in the work list in parallel. The use of fine-grained parallelism in a coarse-grained pipeline can improve overall throughput.

You can also connect a source dataflow block to multiple target blocks to create a dataflow network. The overloaded version of the LinkTo method takes a Predicate<T> object that defines whether the target block accepts each message based on its value. Most dataflow block types that act as sources offer messages to all connected target blocks, in the order in which they were connected, until one of the blocks accepts that message. By using this filtering mechanism, you can create systems of connected dataflow blocks that direct certain data through one path and other data through another path. For an example that uses filtering to create a dataflow network, see Walkthrough: Using Dataflow in a Windows Forms Application.

Show:
© 2014 Microsoft