Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency

.NET Framework 4.5

The TPL Dataflow Library provides the System.Threading.Tasks.Dataflow.BatchBlock<T> and System.Threading.Tasks.Dataflow.BatchedJoinBlock<T1, T2> classes so that you can receive and buffer data from one or more sources and then propagate out that buffered data as one collection. This batching mechanism is useful when you collect data from one or more sources and then process multiple data elements as a batch. For example, consider an application that uses dataflow to insert records into a database. This operation can be more efficient if multiple items are inserted at the same time instead of one at a time sequentially. This document describes how to use the BatchBlock<T> class to improve the efficiency of such database insert operations. It also describes how to use the BatchedJoinBlock<T1, T2> class to capture both the results and any exceptions that occur when the program reads from a database.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

  1. Read the Join Blocks section in the Dataflow (Task Parallel Library) document before you start this walkthrough.

  2. Ensure that you have a copy of the Northwind database, Northwind.sdf, available on your computer. This file is typically located in the folder %Program Files%\Microsoft SQL Server Compact Edition\v3.5\Samples\.

    Important note Important

    In some versions of Windows, you cannot connect to Northwind.sdf if Visual Studio is running in a non-administrator mode. To connect to Northwind.sdf, start Visual Studio or a Visual Studio command prompt in the Run as administrator mode.

This walkthrough contains the following sections:

  1. In Visual Studio, create a Visual C# or Visual Basic Console Application project. In this document, the project is named DataflowBatchDatabase.

  2. In your project, add a reference to System.Data.SqlServerCe.dll and a reference to System.Threading.Tasks.Dataflow.dll.

  3. Ensure that Form1.cs (Form1.vb for Visual Basic) contains the following using (Imports in Visual Basic) statements.

    using System;
    using System.Collections.Generic;
    using System.Data.SqlServerCe;
    using System.Diagnostics;
    using System.IO;
    using System.Threading.Tasks.Dataflow;
    
  4. Add the following data members to the Program class.

    // The number of employees to add to the database. 
    // TODO: Change this value to experiment with different numbers of  
    // employees to insert into the database. 
    static readonly int insertCount = 256;
    
    // The size of a single batch of employees to add to the database. 
    // TODO: Change this value to experiment with different batch sizes. 
    static readonly int insertBatchSize = 96;
    
    // The source database file. 
    // TODO: Change this value if Northwind.sdf is at a different location 
    // on your computer. 
    static readonly string sourceDatabase =
       @"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf";
    
    // TODO: Change this value if you require a different temporary location. 
    static readonly string scratchDatabase =
       @"C:\Temp\Northwind.sdf";
    

Add to the Program class the Employee class.

// Describes an employee. Each property maps to a  
// column in the Employees table in the Northwind database. 
// For brevity, the Employee class does not contain 
// all columns from the Employees table. 
class Employee
{
   public int EmployeeID { get; set; }
   public string LastName { get; set; }
   public string FirstName { get; set; }

   // A random number generator that helps tp generate 
   // Employee property values. 
   static Random rand = new Random(42);

   // Possible random first names. 
   static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
   // Possible random last names. 
   static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };

   // Creates an Employee object that contains random  
   // property values. 
   public static Employee Random()
   {
      return new Employee
      {
         EmployeeID = -1,
         LastName = lastNames[rand.Next() % lastNames.Length],
         FirstName = firstNames[rand.Next() % firstNames.Length]
      };
   }
}

The Employee class contains three properties, EmployeeID, LastName, and FirstName. These properties correspond to the Employee ID, Last Name, and First Name columns in the Employees table in the Northwind database. For this demonstration, the Employee class also defines the Random method, which creates an Employee object that has random values for its properties.

Add to the Program class the InsertEmployees, GetEmployeeCount, and GetEmployeeID methods.

// Adds new employee records to the database. 
static void InsertEmployees(Employee[] employees, string connectionString)
{
   using (SqlCeConnection connection =
      new SqlCeConnection(connectionString))
   {
      try
      {
         // Create the SQL command.
         SqlCeCommand command = new SqlCeCommand(
            "INSERT INTO Employees ([Last Name], [First Name])" +
            "VALUES (@lastName, @firstName)",
            connection);

         connection.Open();
         for (int i = 0; i < employees.Length; i++)
         {
            // Set parameters.
            command.Parameters.Clear();
            command.Parameters.Add("@lastName", employees[i].LastName);
            command.Parameters.Add("@firstName", employees[i].FirstName);

            // Execute the command.
            command.ExecuteNonQuery();
         }
      }
      finally
      {
         connection.Close();
      }
   }
}

// Retrieves the number of entries in the Employees table in  
// the Northwind database. 
static int GetEmployeeCount(string connectionString)
{
   int result = 0;
   using (SqlCeConnection sqlConnection =
      new SqlCeConnection(connectionString))
   {
      SqlCeCommand sqlCommand = new SqlCeCommand(
         "SELECT COUNT(*) FROM Employees", sqlConnection);

      sqlConnection.Open();
      try
      {
         result = (int)sqlCommand.ExecuteScalar();
      }
      finally
      {
         sqlConnection.Close();
      }
   }
   return result;
}

// Retrieves the ID of the first employee that has the provided name. 
static int GetEmployeeID(string lastName, string firstName,
   string connectionString)
{
   using (SqlCeConnection connection =
      new SqlCeConnection(connectionString))
   {
      SqlCeCommand command = new SqlCeCommand(
         string.Format(
            "SELECT [Employee ID] FROM Employees " +
            "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
            lastName, firstName),
         connection);

      connection.Open();
      try
      {
         return (int)command.ExecuteScalar();
      }
      finally
      {
         connection.Close();
      }
   }
}

The InsertEmployees method adds new employee records to the database. The GetEmployeeCount method retrieves the number of entries in the Employees table. The GetEmployeeID method retrieves the identifier of the first employee that has the provided name. Each of these methods takes a connection string to the Northwind database and uses functionality in the System.Data.SqlServerCe namespace to communicate with the database.

Add to the Program class the AddEmployees and PostRandomEmployees methods.

// Posts random Employee data to the provided target block. 
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
   Console.WriteLine("Adding {0} entries to Employee table...", count);

   for (int i = 0; i < count; i++)
   {
      target.Post(Employee.Random());
   }
}

// Adds random employee data to the database by using dataflow. 
static void AddEmployees(string connectionString, int count)
{
   // Create an ActionBlock<Employee> object that adds a single 
   // employee entry to the database. 
   var insertEmployee = new ActionBlock<Employee>(e =>
      InsertEmployees(new Employee[] { e }, connectionString));

   // Post several random Employee objects to the dataflow block.
   PostRandomEmployees(insertEmployee, count);

   // Set the dataflow block to the completed state and wait for  
   // all insert operations to complete.
   insertEmployee.Complete();
   insertEmployee.Completion.Wait();
}

The AddEmployees method adds random employee data to the database by using dataflow. It creates an ActionBlock<TInput> object that calls the InsertEmployees method to add an employee entry to the database. The AddEmployees method then calls the PostRandomEmployees method to post multiple Employee objects to the ActionBlock<TInput> object. The AddEmployees method then waits for all insert operations to finish.

Add to the Program class the AddEmployeesBatched method.

// Adds random employee data to the database by using dataflow. 
// This method is similar to AddEmployees except that it uses batching 
// to add multiple employees to the database at a time. 
static void AddEmployeesBatched(string connectionString, int batchSize,
   int count)
{
   // Create a BatchBlock<Employee> that holds several Employee objects and 
   // then propagates them out as an array. 
   var batchEmployees = new BatchBlock<Employee>(batchSize);

   // Create an ActionBlock<Employee[]> object that adds multiple 
   // employee entries to the database. 
   var insertEmployees = new ActionBlock<Employee[]>(a =>
      InsertEmployees(a, connectionString));

   // Link the batch block to the action block.
   batchEmployees.LinkTo(insertEmployees);

   // When the batch block completes, set the action block also to complete.
   batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

   // Post several random Employee objects to the batch block.
   PostRandomEmployees(batchEmployees, count);

   // Set the batch block to the completed state and wait for  
   // all insert operations to complete.
   batchEmployees.Complete();
   insertEmployees.Completion.Wait();
}

This method resembles AddEmployees, except that it also uses the BatchBlock<T> class to buffer multiple Employee objects before it sends those objects to the ActionBlock<TInput> object. Because the BatchBlock<T> class propagates out multiple elements as a collection, the ActionBlock<TInput> object is modified to act on an array of Employee objects. As in the AddEmployees method, AddEmployeesBatched calls the PostRandomEmployees method to post multiple Employee objects; however, AddEmployeesBatched posts these objects to the BatchBlock<T> object. The AddEmployeesBatched method also waits for all insert operations to finish.

Add to the Program class the GetRandomEmployees method.

This method prints information about random employees to the console. It creates several random Employee objects and calls the GetEmployeeID method to retrieve the unique identifier for each object. Because the GetEmployeeID method throws an exception if there is no matching employee with the given first and last names, the GetRandomEmployees method uses the BatchedJoinBlock<T1, T2> class to store Employee objects for successful calls to GetEmployeeID and System.Exception objects for calls that fail. The ActionBlock<TInput> object in this example acts on a Tuple<T1, T2> object that holds a list of Employee objects and a list of Exception objects. The BatchedJoinBlock<T1, T2> object propagates out this data when the sum of the received Employee and Exception object counts equals the batch size.

The following example shows the complete code. The Main method compares the time that is required to perform batched database insertions versus the time to perform non-batched database insertions. It also demonstrates the use of buffered join to read employee data from the database and also report errors.

using System;
using System.Collections.Generic;
using System.Data.SqlServerCe;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use batched dataflow blocks to improve 
// the performance of database operations. 
namespace DataflowBatchDatabase
{
   class Program
   {
      // The number of employees to add to the database. 
      // TODO: Change this value to experiment with different numbers of  
      // employees to insert into the database. 
      static readonly int insertCount = 256;

      // The size of a single batch of employees to add to the database. 
      // TODO: Change this value to experiment with different batch sizes. 
      static readonly int insertBatchSize = 96;

      // The source database file. 
      // TODO: Change this value if Northwind.sdf is at a different location 
      // on your computer. 
      static readonly string sourceDatabase =
         @"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf";

      // TODO: Change this value if you require a different temporary location. 
      static readonly string scratchDatabase =
         @"C:\Temp\Northwind.sdf";

      // Describes an employee. Each property maps to a  
      // column in the Employees table in the Northwind database. 
      // For brevity, the Employee class does not contain 
      // all columns from the Employees table. 
      class Employee
      {
         public int EmployeeID { get; set; }
         public string LastName { get; set; }
         public string FirstName { get; set; }

         // A random number generator that helps tp generate 
         // Employee property values. 
         static Random rand = new Random(42);

         // Possible random first names. 
         static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
         // Possible random last names. 
         static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };

         // Creates an Employee object that contains random  
         // property values. 
         public static Employee Random()
         {
            return new Employee
            {
               EmployeeID = -1,
               LastName = lastNames[rand.Next() % lastNames.Length],
               FirstName = firstNames[rand.Next() % firstNames.Length]
            };
         }
      }

      // Adds new employee records to the database. 
      static void InsertEmployees(Employee[] employees, string connectionString)
      {
         using (SqlCeConnection connection =
            new SqlCeConnection(connectionString))
         {
            try
            {
               // Create the SQL command.
               SqlCeCommand command = new SqlCeCommand(
                  "INSERT INTO Employees ([Last Name], [First Name])" +
                  "VALUES (@lastName, @firstName)",
                  connection);

               connection.Open();
               for (int i = 0; i < employees.Length; i++)
               {
                  // Set parameters.
                  command.Parameters.Clear();
                  command.Parameters.Add("@lastName", employees[i].LastName);
                  command.Parameters.Add("@firstName", employees[i].FirstName);

                  // Execute the command.
                  command.ExecuteNonQuery();
               }
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Retrieves the number of entries in the Employees table in  
      // the Northwind database. 
      static int GetEmployeeCount(string connectionString)
      {
         int result = 0;
         using (SqlCeConnection sqlConnection =
            new SqlCeConnection(connectionString))
         {
            SqlCeCommand sqlCommand = new SqlCeCommand(
               "SELECT COUNT(*) FROM Employees", sqlConnection);

            sqlConnection.Open();
            try
            {
               result = (int)sqlCommand.ExecuteScalar();
            }
            finally
            {
               sqlConnection.Close();
            }
         }
         return result;
      }

      // Retrieves the ID of the first employee that has the provided name. 
      static int GetEmployeeID(string lastName, string firstName,
         string connectionString)
      {
         using (SqlCeConnection connection =
            new SqlCeConnection(connectionString))
         {
            SqlCeCommand command = new SqlCeCommand(
               string.Format(
                  "SELECT [Employee ID] FROM Employees " +
                  "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
                  lastName, firstName),
               connection);

            connection.Open();
            try
            {
               return (int)command.ExecuteScalar();
            }
            finally
            {
               connection.Close();
            }
         }
      }

      // Posts random Employee data to the provided target block. 
      static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
      {
         Console.WriteLine("Adding {0} entries to Employee table...", count);

         for (int i = 0; i < count; i++)
         {
            target.Post(Employee.Random());
         }
      }

      // Adds random employee data to the database by using dataflow. 
      static void AddEmployees(string connectionString, int count)
      {
         // Create an ActionBlock<Employee> object that adds a single 
         // employee entry to the database. 
         var insertEmployee = new ActionBlock<Employee>(e =>
            InsertEmployees(new Employee[] { e }, connectionString));

         // Post several random Employee objects to the dataflow block.
         PostRandomEmployees(insertEmployee, count);

         // Set the dataflow block to the completed state and wait for  
         // all insert operations to complete.
         insertEmployee.Complete();
         insertEmployee.Completion.Wait();
      }

      // Adds random employee data to the database by using dataflow. 
      // This method is similar to AddEmployees except that it uses batching 
      // to add multiple employees to the database at a time. 
      static void AddEmployeesBatched(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchBlock<Employee> that holds several Employee objects and 
         // then propagates them out as an array. 
         var batchEmployees = new BatchBlock<Employee>(batchSize);

         // Create an ActionBlock<Employee[]> object that adds multiple 
         // employee entries to the database. 
         var insertEmployees = new ActionBlock<Employee[]>(a =>
            InsertEmployees(a, connectionString));

         // Link the batch block to the action block.
         batchEmployees.LinkTo(insertEmployees);

         // When the batch block completes, set the action block also to complete.
         batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });

         // Post several random Employee objects to the batch block.
         PostRandomEmployees(batchEmployees, count);

         // Set the batch block to the completed state and wait for  
         // all insert operations to complete.
         batchEmployees.Complete();
         insertEmployees.Completion.Wait();
      }

      // Prints information about several random employees to the console. 
      static void GetRandomEmployees(string connectionString, int batchSize,
         int count)
      {
         // Create a BatchedJoinBlock<Employee, Exception> object that holds 
         // both employee and exception data. 
         var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);

         // Holds the total number of exceptions that occurred. 
         int totalErrors = 0;

         // Create an action block that prints employee and error information 
         // to the console. 
         var printEmployees =
            new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
            {
               // Print information about the employees in this batch.
               Console.WriteLine("Received a batch...");
               foreach (Employee e in data.Item1)
               {
                  Console.WriteLine("Last={0} First={1} ID={2}",
                     e.FirstName, e.LastName, e.EmployeeID);
               }

               // Print the error count for this batch.
               Console.WriteLine("There were {0} errors in this batch...",
                  data.Item2.Count);

               // Update total error count.
               totalErrors += data.Item2.Count;
            });

         // Link the batched join block to the action block.
         selectEmployees.LinkTo(printEmployees);

         // When the batched join block completes, set the action block also to complete.
         selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });

         // Try to retrieve the ID for several random employees.
         Console.WriteLine("Selecting random entries from Employees table...");
         for (int i = 0; i < count; i++)
         {
            try
            {
               // Create a random employee.
               Employee e = Employee.Random();

               // Try to retrieve the ID for the employee from the database.
               e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);

               // Post the Employee object to the Employee target of  
               // the batched join block.
               selectEmployees.Target1.Post(e);
            }
            catch (NullReferenceException e)
            {
               // GetEmployeeID throws NullReferenceException when there is  
               // no such employee with the given name. When this happens, 
               // post the Exception object to the Exception target of 
               // the batched join block.
               selectEmployees.Target2.Post(e);
            }
         }

         // Set the batched join block to the completed state and wait for  
         // all retrieval operations to complete.
         selectEmployees.Complete();
         printEmployees.Completion.Wait();

         // Print the total error count.
         Console.WriteLine("Finished. There were {0} total errors.", totalErrors);
      }

      static void Main(string[] args)
      {
         // Create a connection string for accessing the database. 
         // The connection string refers to the temporary database location. 
         string connectionString = string.Format(@"Data Source={0}",
            scratchDatabase);

         // Create a Stopwatch object to time database insert operations.
         Stopwatch stopwatch = new Stopwatch();

         // Start with a clean database file by copying the source database to  
         // the temporary location.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple insert operations without batching.
         Console.WriteLine("Demonstrating non-batched database insert operations...");
         Console.WriteLine("Original size of Employee table: {0}.",
            GetEmployeeCount(connectionString));
         stopwatch.Start();
         AddEmployees(connectionString, insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple insert operations, this time with batching.
         Console.WriteLine("Demonstrating batched database insert operations...");
         Console.WriteLine("Original size of Employee table: {0}.",
            GetEmployeeCount(connectionString));
         stopwatch.Restart();
         AddEmployeesBatched(connectionString, insertBatchSize, insertCount);
         stopwatch.Stop();
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
            GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);

         Console.WriteLine();

         // Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, true);

         // Demonstrate multiple retrieval operations with error reporting.
         Console.WriteLine("Demonstrating batched join database select operations...");
         // Add a small number of employees to the database.
         AddEmployeesBatched(connectionString, insertBatchSize, 16);
         // Query for random employees.
         GetRandomEmployees(connectionString, insertBatchSize, 10);
      }
   }
}
/* Sample output:
Demonstrating non-batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 11035 ms.

Demonstrating batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 197 ms.

Demonstrating batched join database insert operations...
Adding 16 entries to Employee table...
Selecting items from Employee table...
Received a batch...
Last=Tom First=Jones ID=21
Last=John First=Jones ID=24
Last=Tom First=Smith ID=26
Last=Tom First=Jones ID=21
There were 4 errors in this batch...
Received a batch...
Last=Tom First=Smith ID=26
Last=Mike First=Jones ID=28
There were 0 errors in this batch...
Finished. There were 4 total errors.
*/
Was this page helpful?
(1500 characters remaining)
Thank you for your feedback
Show:
© 2014 Microsoft