Export (0) Print
Expand All

Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency

.NET Framework 4.5

The TPL Dataflow Library provides the System.Threading.Tasks.Dataflow.BatchBlock(Of T) and System.Threading.Tasks.Dataflow.BatchedJoinBlock(Of T1, T2) classes so that you can receive and buffer data from one or more sources and then propagate out that buffered data as one collection. This batching mechanism is useful when you collect data from one or more sources and then process multiple data elements as a batch. For example, consider an application that uses dataflow to insert records into a database. This operation can be more efficient if multiple items are inserted at the same time instead of one at a time sequentially. This document describes how to use the BatchBlock(Of T) class to improve the efficiency of such database insert operations. It also describes how to use the BatchedJoinBlock(Of T1, T2) class to capture both the results and any exceptions that occur when the program reads from a database.

Tip Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

  1. Read the Join Blocks section in the Dataflow (Task Parallel Library) document before you start this walkthrough.

  2. Ensure that you have a copy of the Northwind database, Northwind.sdf, available on your computer. This file is typically located in the folder %Program Files%\Microsoft SQL Server Compact Edition\v3.5\Samples\.

    Important note Important

    In some versions of Windows, you cannot connect to Northwind.sdf if Visual Studio is running in a non-administrator mode. To connect to Northwind.sdf, start Visual Studio or a Visual Studio command prompt in the Run as administrator mode.

This walkthrough contains the following sections:

  1. In Visual Studio, create a Visual C# or Visual Basic Console Application project. In this document, the project is named DataflowBatchDatabase.

  2. In your project, add a reference to System.Data.SqlServerCe.dll and a reference to System.Threading.Tasks.Dataflow.dll.

  3. Ensure that Form1.cs (Form1.vb for Visual Basic) contains the following using (Imports in Visual Basic) statements.

    Imports System
    Imports System.Collections.Generic
    Imports System.Data.SqlServerCe
    Imports System.Diagnostics
    Imports System.IO
    Imports System.Threading.Tasks.Dataflow
    
  4. Add the following data members to the Program class.

    ' The number of employees to add to the database. 
    ' TODO: Change this value to experiment with different numbers of  
    ' employees to insert into the database. 
    Private Shared ReadOnly insertCount As Integer = 256
    
    ' The size of a single batch of employees to add to the database. 
    ' TODO: Change this value to experiment with different batch sizes. 
    Private Shared ReadOnly insertBatchSize As Integer = 96
    
    ' The source database file. 
    ' TODO: Change this value if Northwind.sdf is at a different location 
    ' on your computer. 
    Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf" 
    
    ' TODO: Change this value if you require a different temporary location. 
    Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
    

Add to the Program class the Employee class.

' Describes an employee. Each property maps to a  
' column in the Employees table in the Northwind database. 
' For brevity, the Employee class does not contain 
' all columns from the Employees table. 
Private Class Employee
   Public Property EmployeeID() As Integer 
   Public Property LastName() As String 
   Public Property FirstName() As String 

   ' A random number generator that helps tp generate 
   ' Employee property values. 
   Private Shared rand As New Random(42)

   ' Possible random first names. 
   Private Shared ReadOnly firstNames() As String = { "Tom", "Mike", "Ruth", "Bob", "John" }
   ' Possible random last names. 
   Private Shared ReadOnly lastNames() As String = { "Jones", "Smith", "Johnson", "Walker" }

   ' Creates an Employee object that contains random  
   ' property values. 
   Public Shared Function Random() As Employee
      Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
   End Function 
End Class

The Employee class contains three properties, EmployeeID, LastName, and FirstName. These properties correspond to the Employee ID, Last Name, and First Name columns in the Employees table in the Northwind database. For this demonstration, the Employee class also defines the Random method, which creates an Employee object that has random values for its properties.

Add to the Program class the InsertEmployees, GetEmployeeCount, and GetEmployeeID methods.

' Adds new employee records to the database. 
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
   Using connection As New SqlCeConnection(connectionString)
      Try 
         ' Create the SQL command. 
         Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

         connection.Open()
         For i As Integer = 0 To employees.Length - 1
            ' Set parameters.
            command.Parameters.Clear()
            command.Parameters.Add("@lastName", employees(i).LastName)
            command.Parameters.Add("@firstName", employees(i).FirstName)

            ' Execute the command.
            command.ExecuteNonQuery()
         Next i
      Finally
         connection.Close()
      End Try 
   End Using 
End Sub 

' Retrieves the number of entries in the Employees table in  
' the Northwind database. 
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer 
   Dim result As Integer = 0
   Using sqlConnection As New SqlCeConnection(connectionString)
      Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

      sqlConnection.Open()
      Try
         result = CInt(Fix(sqlCommand.ExecuteScalar()))
      Finally
         sqlConnection.Close()
      End Try 
   End Using 
   Return result
End Function 

' Retrieves the ID of the first employee that has the provided name. 
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer 
   Using connection As New SqlCeConnection(connectionString)
      Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)

      connection.Open()
      Try 
         Return CInt(Fix(command.ExecuteScalar()))
      Finally
         connection.Close()
      End Try 
   End Using 
End Function

The InsertEmployees method adds new employee records to the database. The GetEmployeeCount method retrieves the number of entries in the Employees table. The GetEmployeeID method retrieves the identifier of the first employee that has the provided name. Each of these methods takes a connection string to the Northwind database and uses functionality in the System.Data.SqlServerCe namespace to communicate with the database.

Add to the Program class the AddEmployees and PostRandomEmployees methods.

' Posts random Employee data to the provided target block. 
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
   Console.WriteLine("Adding {0} entries to Employee table...", count)

   For i As Integer = 0 To count - 1
      target.Post(Employee.Random())
   Next i
End Sub 

' Adds random employee data to the database by using dataflow. 
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
   ' Create an ActionBlock<Employee> object that adds a single 
   ' employee entry to the database. 
   Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() { e }, connectionString))

   ' Post several random Employee objects to the dataflow block.
   PostRandomEmployees(insertEmployee, count)

   ' Set the dataflow block to the completed state and wait for  
   ' all insert operations to complete.
   insertEmployee.Complete()
   insertEmployee.Completion.Wait()
End Sub

The AddEmployees method adds random employee data to the database by using dataflow. It creates an ActionBlock(Of TInput) object that calls the InsertEmployees method to add an employee entry to the database. The AddEmployees method then calls the PostRandomEmployees method to post multiple Employee objects to the ActionBlock(Of TInput) object. The AddEmployees method then waits for all insert operations to finish.

Add to the Program class the AddEmployeesBatched method.

' Adds random employee data to the database by using dataflow. 
' This method is similar to AddEmployees except that it uses batching 
' to add multiple employees to the database at a time. 
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
   ' Create a BatchBlock<Employee> that holds several Employee objects and 
   ' then propagates them out as an array. 
   Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

   ' Create an ActionBlock<Employee[]> object that adds multiple 
   ' employee entries to the database. 
   Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

   ' Link the batch block to the action block.
   batchEmployees.LinkTo(insertEmployees)

   ' When the batch block completes, set the action block also to complete.
   batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

   ' Post several random Employee objects to the batch block.
   PostRandomEmployees(batchEmployees, count)

   ' Set the batch block to the completed state and wait for  
   ' all insert operations to complete.
   batchEmployees.Complete()
   insertEmployees.Completion.Wait()
End Sub

This method resembles AddEmployees, except that it also uses the BatchBlock(Of T) class to buffer multiple Employee objects before it sends those objects to the ActionBlock(Of TInput) object. Because the BatchBlock(Of T) class propagates out multiple elements as a collection, the ActionBlock(Of TInput) object is modified to act on an array of Employee objects. As in the AddEmployees method, AddEmployeesBatched calls the PostRandomEmployees method to post multiple Employee objects; however, AddEmployeesBatched posts these objects to the BatchBlock(Of T) object. The AddEmployeesBatched method also waits for all insert operations to finish.

Add to the Program class the GetRandomEmployees method.

This method prints information about random employees to the console. It creates several random Employee objects and calls the GetEmployeeID method to retrieve the unique identifier for each object. Because the GetEmployeeID method throws an exception if there is no matching employee with the given first and last names, the GetRandomEmployees method uses the BatchedJoinBlock(Of T1, T2) class to store Employee objects for successful calls to GetEmployeeID and System.Exception objects for calls that fail. The ActionBlock(Of TInput) object in this example acts on a Tuple(Of T1, T2) object that holds a list of Employee objects and a list of Exception objects. The BatchedJoinBlock(Of T1, T2) object propagates out this data when the sum of the received Employee and Exception object counts equals the batch size.

The following example shows the complete code. The Main method compares the time that is required to perform batched database insertions versus the time to perform non-batched database insertions. It also demonstrates the use of buffered join to read employee data from the database and also report errors.

Imports System
Imports System.Collections.Generic
Imports System.Data.SqlServerCe
Imports System.Diagnostics
Imports System.IO
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to use batched dataflow blocks to improve 
' the performance of database operations. 
Namespace DataflowBatchDatabase
   Friend Class Program
      ' The number of employees to add to the database. 
      ' TODO: Change this value to experiment with different numbers of  
      ' employees to insert into the database. 
      Private Shared ReadOnly insertCount As Integer = 256

      ' The size of a single batch of employees to add to the database. 
      ' TODO: Change this value to experiment with different batch sizes. 
      Private Shared ReadOnly insertBatchSize As Integer = 96

      ' The source database file. 
      ' TODO: Change this value if Northwind.sdf is at a different location 
      ' on your computer. 
      Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf" 

      ' TODO: Change this value if you require a different temporary location. 
      Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf" 

      ' Describes an employee. Each property maps to a  
      ' column in the Employees table in the Northwind database. 
      ' For brevity, the Employee class does not contain 
      ' all columns from the Employees table. 
      Private Class Employee
         Public Property EmployeeID() As Integer 
         Public Property LastName() As String 
         Public Property FirstName() As String 

         ' A random number generator that helps tp generate 
         ' Employee property values. 
         Private Shared rand As New Random(42)

         ' Possible random first names. 
         Private Shared ReadOnly firstNames() As String = { "Tom", "Mike", "Ruth", "Bob", "John" }
         ' Possible random last names. 
         Private Shared ReadOnly lastNames() As String = { "Jones", "Smith", "Johnson", "Walker" }

         ' Creates an Employee object that contains random  
         ' property values. 
         Public Shared Function Random() As Employee
            Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
         End Function 
      End Class 

      ' Adds new employee records to the database. 
      Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
         Using connection As New SqlCeConnection(connectionString)
            Try 
               ' Create the SQL command. 
               Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)

               connection.Open()
               For i As Integer = 0 To employees.Length - 1
                  ' Set parameters.
                  command.Parameters.Clear()
                  command.Parameters.Add("@lastName", employees(i).LastName)
                  command.Parameters.Add("@firstName", employees(i).FirstName)

                  ' Execute the command.
                  command.ExecuteNonQuery()
               Next i
            Finally
               connection.Close()
            End Try 
         End Using 
      End Sub 

      ' Retrieves the number of entries in the Employees table in  
      ' the Northwind database. 
      Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer 
         Dim result As Integer = 0
         Using sqlConnection As New SqlCeConnection(connectionString)
            Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)

            sqlConnection.Open()
            Try
               result = CInt(Fix(sqlCommand.ExecuteScalar()))
            Finally
               sqlConnection.Close()
            End Try 
         End Using 
         Return result
      End Function 

      ' Retrieves the ID of the first employee that has the provided name. 
      Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer 
         Using connection As New SqlCeConnection(connectionString)
            Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)

            connection.Open()
            Try 
               Return CInt(Fix(command.ExecuteScalar()))
            Finally
               connection.Close()
            End Try 
         End Using 
      End Function 

      ' Posts random Employee data to the provided target block. 
      Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
         Console.WriteLine("Adding {0} entries to Employee table...", count)

         For i As Integer = 0 To count - 1
            target.Post(Employee.Random())
         Next i
      End Sub 

      ' Adds random employee data to the database by using dataflow. 
      Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
         ' Create an ActionBlock<Employee> object that adds a single 
         ' employee entry to the database. 
         Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() { e }, connectionString))

         ' Post several random Employee objects to the dataflow block.
         PostRandomEmployees(insertEmployee, count)

         ' Set the dataflow block to the completed state and wait for  
         ' all insert operations to complete.
         insertEmployee.Complete()
         insertEmployee.Completion.Wait()
      End Sub 

      ' Adds random employee data to the database by using dataflow. 
      ' This method is similar to AddEmployees except that it uses batching 
      ' to add multiple employees to the database at a time. 
      Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
         ' Create a BatchBlock<Employee> that holds several Employee objects and 
         ' then propagates them out as an array. 
         Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)

         ' Create an ActionBlock<Employee[]> object that adds multiple 
         ' employee entries to the database. 
         Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))

         ' Link the batch block to the action block.
         batchEmployees.LinkTo(insertEmployees)

         ' When the batch block completes, set the action block also to complete.
         batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())

         ' Post several random Employee objects to the batch block.
         PostRandomEmployees(batchEmployees, count)

         ' Set the batch block to the completed state and wait for  
         ' all insert operations to complete.
         batchEmployees.Complete()
         insertEmployees.Completion.Wait()
      End Sub 

      ' Prints information about several random employees to the console. 
      Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
         ' Create a BatchedJoinBlock<Employee, Exception> object that holds 
         ' both employee and exception data. 
         Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)

         ' Holds the total number of exceptions that occurred. 
         Dim totalErrors As Integer = 0

         ' Create an action block that prints employee and error information 
         ' to the console. 
         Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
               ' Print information about the employees in this batch. 
               ' Print the error count for this batch. 
               ' Update total error count.
             Console.WriteLine("Received a batch...")
             For Each e As Employee In data.Item1
                 Console.WriteLine("Last={0} First={1} ID={2}", e.FirstName, e.LastName, e.EmployeeID)
             Next e
             Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
             totalErrors += data.Item2.Count
         End Sub)

         ' Link the batched join block to the action block.
         selectEmployees.LinkTo(printEmployees)

         ' When the batched join block completes, set the action block also to complete.
         selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())

         ' Try to retrieve the ID for several random employees.
         Console.WriteLine("Selecting random entries from Employees table...")
         For i As Integer = 0 To count - 1
            Try 
               ' Create a random employee. 
               Dim e As Employee = Employee.Random()

               ' Try to retrieve the ID for the employee from the database.
               e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)

               ' Post the Employee object to the Employee target of  
               ' the batched join block.
               selectEmployees.Target1.Post(e)
            Catch e As NullReferenceException
               ' GetEmployeeID throws NullReferenceException when there is  
               ' no such employee with the given name. When this happens, 
               ' post the Exception object to the Exception target of 
               ' the batched join block.
               selectEmployees.Target2.Post(e)
            End Try 
         Next i

         ' Set the batched join block to the completed state and wait for  
         ' all retrieval operations to complete.
         selectEmployees.Complete()
         printEmployees.Completion.Wait()

         ' Print the total error count.
         Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
      End Sub 

      Shared Sub Main(ByVal args() As String)
         ' Create a connection string for accessing the database. 
         ' The connection string refers to the temporary database location. 
         Dim connectionString As String = String.Format("Data Source={0}", scratchDatabase)

         ' Create a Stopwatch object to time database insert operations. 
         Dim stopwatch As New Stopwatch()

         ' Start with a clean database file by copying the source database to  
         ' the temporary location.
         File.Copy(sourceDatabase, scratchDatabase, True)

         ' Demonstrate multiple insert operations without batching.
         Console.WriteLine("Demonstrating non-batched database insert operations...")
         Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
         stopwatch.Start()
         AddEmployees(connectionString, insertCount)
         stopwatch.Stop()
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

         Console.WriteLine()

         ' Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, True)

         ' Demonstrate multiple insert operations, this time with batching.
         Console.WriteLine("Demonstrating batched database insert operations...")
         Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
         stopwatch.Restart()
         AddEmployeesBatched(connectionString, insertBatchSize, insertCount)
         stopwatch.Stop()
         Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)

         Console.WriteLine()

         ' Start again with a clean database file.
         File.Copy(sourceDatabase, scratchDatabase, True)

         ' Demonstrate multiple retrieval operations with error reporting.
         Console.WriteLine("Demonstrating batched join database select operations...")
         ' Add a small number of employees to the database.
         AddEmployeesBatched(connectionString, insertBatchSize, 16)
         ' Query for random employees.
         GetRandomEmployees(connectionString, insertBatchSize, 10)
      End Sub 
   End Class 
End Namespace 
' Sample output: 
'Demonstrating non-batched database insert operations... 
'Original size of Employee table: 15. 
'Adding 256 entries to Employee table... 
'New size of Employee table: 271; elapsed insert time: 11035 ms. 

'Demonstrating batched database insert operations... 
'Original size of Employee table: 15. 
'Adding 256 entries to Employee table... 
'New size of Employee table: 271; elapsed insert time: 197 ms. 

'Demonstrating batched join database insert operations... 
'Adding 16 entries to Employee table... 
'Selecting items from Employee table... 
'Received a batch... 
'Last=Tom First=Jones ID=21 
'Last=John First=Jones ID=24 
'Last=Tom First=Smith ID=26 
'Last=Tom First=Jones ID=21 
'There were 4 errors in this batch... 
'Received a batch... 
'Last=Tom First=Smith ID=26 
'Last=Mike First=Jones ID=28 
'There were 0 errors in this batch... 
'Finished. There were 4 total errors. 
'
Show:
© 2014 Microsoft