Export (0) Print
Expand All

Best Practices in the Parallel Patterns Library

This document describes how best to make effective use of the Parallel Patterns Library (PPL). The PPL provides general-purpose containers, objects, and algorithms for performing fine-grained parallelism.

For more information about the PPL, see Parallel Patterns Library (PPL).

The parallelization of relatively small loop bodies can cause the associated scheduling overhead to outweigh the benefits of parallel processing. Consider the following example, which adds each pair of elements in two arrays.

// small-loops.cpp 
// compile with: /EHsc
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create three arrays that each have the same size. 
   const size_t size = 100000;
   int a[size], b[size], c[size];

   // Initialize the arrays a and b. 
   for (size_t i = 0; i < size; ++i)
   {
      a[i] = i;
      b[i] = i * 2;
   }

   // Add each pair of elements in arrays a and b in parallel  
   // and store the result in array c.
   parallel_for<size_t>(0, size, [&a,&b,&c](size_t i) {
      c[i] = a[i] + b[i];
   });

   // TODO: Do something with array c.
}

The workload for each parallel loop iteration is too small to benefit from the overhead for parallel processing. You can improve the performance of this loop by performing more work in the loop body or by performing the loop serially.

[Top]

When you parallelize code only at the low level, you can introduce a fork-join construct that does not scale as the number of processors increases. A fork-join construct is a construct where one task divides its work into smaller parallel subtasks and waits for those subtasks to finish. Each subtask can recursively divide itself into additional subtasks.

Although the fork-join model can be useful for solving a variety of problems, there are situations where the synchronization overhead can decrease scalability. For example, consider the following serial code that processes image data.

// Calls the provided function for each pixel in a Bitmap object. 
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image. 
   for (int y = 0; y < height; ++y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   }

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

Because each loop iteration is independent, you can parallelize much of the work, as shown in the following example. This example uses the concurrency::parallel_for algorithm to parallelize the outer loop.

// Calls the provided function for each pixel in a Bitmap object. 
void ProcessImage(Bitmap* bmp, const function<void (DWORD&)>& f)
{
   int width = bmp->GetWidth();
   int height = bmp->GetHeight();

   // Lock the bitmap.
   BitmapData bitmapData;
   Rect rect(0, 0, bmp->GetWidth(), bmp->GetHeight());
   bmp->LockBits(&rect, ImageLockModeWrite, PixelFormat32bppRGB, &bitmapData);

   // Get a pointer to the bitmap data.
   DWORD* image_bits = (DWORD*)bitmapData.Scan0;

   // Call the function for each pixel in the image.
   parallel_for (0, height, [&, width](int y)
   {      
      for (int x = 0; x < width; ++x)
      {
         // Get the current pixel value.
         DWORD* curr_pixel = image_bits + (y * width) + x;

         // Call the function.
         f(*curr_pixel);
      }
   });

   // Unlock the bitmap.
   bmp->UnlockBits(&bitmapData);
}

The following example illustrates a fork-join construct by calling the ProcessImage function in a loop. Each call to ProcessImage does not return until each subtask finishes.

// Processes each bitmap in the provided vector. 
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   for_each(begin(bitmaps), end(bitmaps), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

If each iteration of the parallel loop either performs almost no work, or the work that is performed by the parallel loop is imbalanced, that is, some loop iterations take longer than others, the scheduling overhead that is required to frequently fork and join work can outweigh the benefit to parallel execution. This overhead increases as the number of processors increases.

To reduce the amount of scheduling overhead in this example, you can parallelize outer loops before you parallelize inner loops or use another parallel construct such as pipelining. The following example modifies the ProcessImages function to use the concurrency::parallel_for_each algorithm to parallelize the outer loop.

// Processes each bitmap in the provided vector. 
void ProcessImages(vector<Bitmap*> bitmaps, const function<void (DWORD&)>& f)
{
   parallel_for_each(begin(bitmaps), end(bitmaps), [&f](Bitmap* bmp) {
      ProcessImage(bmp, f);
   });
}

For a similar example that uses a pipeline to perform image processing in parallel, see Walkthrough: Creating an Image-Processing Network.

[Top]

A divide-and-conquer problem is a form of the fork-join construct that uses recursion to break a task into subtasks. In addition to the concurrency::task_group and concurrency::structured_task_group classes, you can also use the concurrency::parallel_invoke algorithm to solve divide-and-conquer problems. The parallel_invoke algorithm has a more succinct syntax than task group objects, and is useful when you have a fixed number of parallel tasks.

The following example illustrates the use of the parallel_invoke algorithm to implement the bitonic sorting algorithm.

// Sorts the given sequence in the specified order. 
template <class T>
void parallel_bitonic_sort(T* items, int lo, int n, bool dir)
{   
   if (n > 1)
   {
      // Divide the array into two partitions and then sort  
      // the partitions in different directions. 
      int m = n / 2;

      parallel_invoke(
         [&] { parallel_bitonic_sort(items, lo, m, INCREASING); },
         [&] { parallel_bitonic_sort(items, lo + m, m, DECREASING); }
      );

      // Merge the results.
      parallel_bitonic_merge(items, lo, n, dir);
   }
}

To reduce overhead, the parallel_invoke algorithm performs the last of the series of tasks on the calling context.

For the complete version of this example, see How to: Use parallel_invoke to Write a Parallel Sort Routine. For more information about the parallel_invoke algorithm, see Parallel Algorithms.

[Top]

The PPL provides two ways to cancel the parallel work that is performed by a task group or parallel algorithm. One way is to use the cancellation mechanism that is provided by the concurrency::task_group and concurrency::structured_task_group classes. The other way is to throw an exception in the body of a task work function. The cancellation mechanism is more efficient than exception handling at canceling a tree of parallel work. A parallel work tree is a group of related task groups in which some task groups contain other task groups. The cancellation mechanism cancels a task group and its child task groups in a top-down manner. Conversely, exception handling works in a bottom-up manner and must cancel each child task group independently as the exception propagates upward.

When you work directly with a task group object, use the concurrency::task_group::cancel or concurrency::structured_task_group::cancel methods to cancel the work that belongs to that task group. To cancel a parallel algorithm, for example, parallel_for, create a parent task group and cancel that task group. For example, consider the following function, parallel_find_any, which searches for a value in an array in parallel.

// Returns the position in the provided array that contains the given value,  
// or -1 if the value is not in the array. 
template<typename T>
int parallel_find_any(const T a[], size_t count, const T& what)
{
   // The position of the element in the array.  
   // The default value, -1, indicates that the element is not in the array. 
   int position = -1;

   // Call parallel_for in the context of a cancellation token to search for the element.
   cancellation_token_source cts;
   run_with_cancellation_token([count, what, &a, &position, &cts]()
   {
      parallel_for(std::size_t(0), count, [what, &a, &position, &cts](int n) {
         if (a[n] == what)
         {
            // Set the return value and cancel the remaining tasks.
            position = n;
            cts.cancel();
         }
      });
   }, cts.get_token());

   return position;
}

Because parallel algorithms use task groups, when one of the parallel iterations cancels the parent task group, the overall task is canceled. For the complete version of this example, see How to: Use Cancellation to Break from a Parallel Loop.

Although exception handling is a less efficient way to cancel parallel work than the cancellation mechanism, there are cases where exception handling is appropriate. For example, the following method, for_all, recursively performs a work function on each node of a tree structure. In this example, the _children data member is a std::list that contains tree objects.

// Performs the given work function on the data element of the tree and 
// on each child. 
template<class Function>
void tree::for_all(Function& action)
{
   // Perform the action on each child.
   parallel_for_each(begin(_children), end(_children), [&](tree& child) {
      child.for_all(action);
   });

   // Perform the action on this node.
   action(*this);
}

The caller of the tree::for_all method can throw an exception if it does not require the work function to be called on each element of the tree. The following example shows the search_for_value function, which searches for a value in the provided tree object. The search_for_value function uses a work function that throws an exception when the current element of the tree matches the provided value. The search_for_value function uses a try-catch block to capture the exception and print the result to the console.

// Searches for a value in the provided tree object. 
template <typename T>
void search_for_value(tree<T>& t, int value)
{
   try
   {
      // Call the for_all method to search for a value. The work function 
      // throws an exception when it finds the value.
      t.for_all([value](const tree<T>& node) {
         if (node.get_data() == value)
         {
            throw &node;
         }
      });
   }
   catch (const tree<T>* node)
   {
      // A matching node was found. Print a message to the console.
      wstringstream ss;
      ss << L"Found a node with value " << value << L'.' << endl;
      wcout << ss.str();
      return;
   }

   // A matching node was not found. Print a message to the console.
   wstringstream ss;
   ss << L"Did not find node with value " << value << L'.' << endl;
   wcout << ss.str();   
}

For the complete version of this example, see How to: Use Exception Handling to Break from a Parallel Loop.

For more general information about the cancellation and exception-handling mechanisms that are provided by the PPL, see Cancellation in the PPL and Exception Handling in the Concurrency Runtime.

[Top]

In a tree of parallel work, a task that is canceled prevents child tasks from running. This can cause problems if one of the child tasks performs an operation that is important to your application, such as freeing a resource. In addition, task cancellation can cause an exception to propagate through an object destructor and cause undefined behavior in your application.

In the following example, the Resource class describes a resource and the Container class describes a container that holds resources. In its destructor, the Container class calls the cleanup method on two of its Resource members in parallel and then calls the cleanup method on its third Resource member.

// parallel-resource-destruction.h
#pragma once
#include <ppl.h>
#include <sstream>
#include <iostream>

// Represents a resource.
class Resource
{
public:
   Resource(const std::wstring& name)
      : _name(name)
   {
   }

   // Frees the resource.
   void cleanup()
   {
      // Print a message as a placeholder.
      std::wstringstream ss;
      ss << _name << L": Freeing..." << std::endl;
      std::wcout << ss.str();
   }
private:
   // The name of the resource.
   std::wstring _name;
};

// Represents a container that holds resources.
class Container
{
public:
   Container(const std::wstring& name)
      : _name(name)
      , _resource1(L"Resource 1")
      , _resource2(L"Resource 2")
      , _resource3(L"Resource 3")
   {
   }

   ~Container()
   {
      std::wstringstream ss;
      ss << _name << L": Freeing resources..." << std::endl;
      std::wcout << ss.str();

      // For illustration, assume that cleanup for _resource1
      // and _resource2 can happen concurrently, and that 
      // _resource3 must be freed after _resource1 and _resource2.

      concurrency::parallel_invoke(
         [this]() { _resource1.cleanup(); },
         [this]() { _resource2.cleanup(); }
      );

      _resource3.cleanup();
   }

private:
   // The name of the container.
   std::wstring _name;

   // Resources.
   Resource _resource1;
   Resource _resource2;
   Resource _resource3;
};

Although this pattern has no problems on its own, consider the following code that runs two tasks in parallel. The first task creates a Container object and the second task cancels the overall task. For illustration, the example uses two concurrency::event objects to make sure that the cancellation occurs after the Container object is created and that the Container object is destroyed after the cancellation operation occurs.

// parallel-resource-destruction.cpp 
// compile with: /EHsc
#include "parallel-resource-destruction.h" 

using namespace concurrency;
using namespace std;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{  
   // Create a task_group that will run two tasks.
   task_group tasks;

   // Used to synchronize the tasks. 
   event e1, e2;

   // Run two tasks. The first task creates a Container object. The second task 
   // cancels the overall task group. To illustrate the scenario where a child  
   // task is not run because its parent task is cancelled, the event objects  
   // ensure that the Container object is created before the overall task is  
   // cancelled and that the Container object is destroyed after the overall  
   // task is cancelled.

   tasks.run([&tasks,&e1,&e2] {
      // Create a Container object.
      Container c(L"Container 1");

      // Allow the second task to continue.
      e2.set();

      // Wait for the task to be cancelled.
      e1.wait();
   });

   tasks.run([&tasks,&e1,&e2] {
      // Wait for the first task to create the Container object.
      e2.wait();

      // Cancel the overall task.
      tasks.cancel();      

      // Allow the first task to continue.
      e1.set();
   });

   // Wait for the tasks to complete.
   tasks.wait();

   wcout << L"Exiting program..." << endl;
}

This example produces the following output:

Container 1: Freeing resources...
Exiting program...

This code example contains the following issues that may cause it to behave differently than you expect:

  • The cancellation of the parent task causes the child task, the call to concurrency::parallel_invoke, to also be canceled. Therefore, these two resources are not freed.

  • The cancellation of the parent task causes the child task to throw an internal exception. Because the Container destructor does not handle this exception, the exception is propagated upward and the third resource is not freed.

  • The exception that is thrown by the child task propagates through the Container destructor. Throwing from a destructor puts the application in an undefined state.

We recommend that you do not perform critical operations, such as the freeing of resources, in tasks unless you can guarantee that these tasks will not be canceled. We also recommend that you do not use runtime functionality that can throw in the destructor of your types.

[Top]

A parallel loop such as concurrency::parallel_for or concurrency::parallel_for_each that is dominated by blocking operations may cause the runtime to create many threads over a short time.

The Concurrency Runtime performs additional work when a task finishes or cooperatively blocks or yields. When one parallel loop iteration blocks, the runtime might begin another iteration. When there are no available idle threads, the runtime creates a new thread.

When the body of a parallel loop occasionally blocks, this mechanism helps maximize the overall task throughput. However, when many iterations block, the runtime may create many threads to run the additional work. This could lead to low-memory conditions or poor utilization of hardware resources.

Consider the following example that calls the concurrency::send function in each iteration of a parallel_for loop. Because send blocks cooperatively, the runtime creates a new thread to run additional work every time send is called.

// repeated-blocking.cpp 
// compile with: /EHsc
#include <ppl.h>
#include <agents.h>

using namespace concurrency;

static_assert(false, "This example illustrates a non-recommended practice.");

int main()
{
   // Create a message buffer.
   overwrite_buffer<int> buffer;

   // Repeatedly send data to the buffer in a parallel loop.
   parallel_for(0, 1000, [&buffer](int i) {

      // The send function blocks cooperatively.  
      // We discourage the use of repeated blocking in a parallel 
      // loop because it can cause the runtime to create  
      // a large number of threads over a short period of time.
      send(buffer, i);
   });
}

We recommend that you refactor your code to avoid this pattern. In this example, you can avoid the creation of additional threads by calling send in a serial for loop.

[Top]

When possible, do not perform blocking operations before you call the concurrency::task_group::cancel or concurrency::structured_task_group::cancel method to cancel parallel work.

When a task performs a cooperative blocking operation, the runtime can perform other work while the first task waits for data. The runtime reschedules the waiting task when it unblocks. The runtime typically reschedules tasks that were more recently unblocked before it reschedules tasks that were less recently unblocked. Therefore, the runtime could schedule unnecessary work during the blocking operation, which leads to decreased performance. Accordingly, when you perform a blocking operation before you cancel parallel work, the blocking operation can delay the call to cancel. This causes other tasks to perform unnecessary work.

Consider the following example that defines the parallel_find_answer function, which searches for an element of the provided array that satisfies the provided predicate function. When the predicate function returns true, the parallel work function creates an Answer object and cancels the overall task.

// blocking-cancel.cpp 
// compile with: /c /EHsc
#include <windows.h>
#include <ppl.h>

using namespace concurrency;

// Encapsulates the result of a search operation. 
template<typename T>
class Answer
{
public:
   explicit Answer(const T& data)
      : _data(data)
   {
   }

   T get_data() const
   {
      return _data;
   }

   // TODO: Add other methods as needed. 

private:
   T _data;

   // TODO: Add other data members as needed.
};

// Searches for an element of the provided array that satisfies the provided 
// predicate function. 
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer. 
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function. 
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);
            // Cancel the overall task.
            tasks.cancel();
         }
      });
   });

   return answer;
}

The new operator performs a heap allocation, which might block. The runtime performs other work only when the task performs a cooperative blocking call, such as a call to concurrency::critical_section::lock.

The following example shows how to prevent unnecessary work, and thereby improve performance. This example cancels the task group before it allocates the storage for the Answer object.

// Searches for an element of the provided array that satisfies the provided 
// predicate function. 
template<typename T, class Predicate>
Answer<T>* parallel_find_answer(const T a[], size_t count, const Predicate& pred)
{
   // The result of the search.
   Answer<T>* answer = nullptr;
   // Ensures that only one task produces an answer. 
   volatile long first_result = 0;

   // Use parallel_for and a task group to search for the element.
   structured_task_group tasks;
   tasks.run_and_wait([&]
   {
      // Declare the type alias for use in the inner lambda function. 
      typedef T T;

      parallel_for<size_t>(0, count, [&](const T& n) {
         if (pred(a[n]) && InterlockedExchange(&first_result, 1) == 0)
         {
            // Cancel the overall task.
            tasks.cancel();
            // Create an object that holds the answer.
            answer = new Answer<T>(a[n]);            
         }
      });
   });

   return answer;
}

[Top]

The Concurrency Runtime provides several data structures, for example, concurrency::critical_section, that synchronize concurrent access to shared data. These data structures are useful in many cases, for example, when multiple tasks infrequently require shared access to a resource.

Consider the following example that uses the concurrency::parallel_for_each algorithm and a critical_section object to compute the count of prime numbers in a std::array object. This example does not scale because each thread must wait to access the shared variable prime_sum.

critical_section cs;
prime_sum = 0;
parallel_for_each(begin(a), end(a), [&](int i) {
   cs.lock();
   prime_sum += (is_prime(i) ? i : 0);
   cs.unlock();
});

This example can also lead to poor performance because the frequent locking operation effectively serializes the loop. In addition, when a Concurrency Runtime object performs a blocking operation, the scheduler might create an additional thread to perform other work while the first thread waits for data. If the runtime creates many threads because many tasks are waiting for shared data, the application can perform poorly or enter a low-resource state.

The PPL defines the concurrency::combinable class, which helps you eliminate shared state by providing access to shared resources in a lock-free manner. The combinable class provides thread-local storage that lets you perform fine-grained computations and then merge those computations into a final result. You can think of a combinable object as a reduction variable.

The following example modifies the previous one by using a combinable object instead of a critical_section object to compute the sum. This example scales because each thread holds its own local copy of the sum. This example uses the concurrency::combinable::combine method to merge the local computations into the final result.

combinable<int> sum;
parallel_for_each(begin(a), end(a), [&](int i) {
   sum.local() += (is_prime(i) ? i : 0);
});
prime_sum = sum.combine(plus<int>());

For the complete version of this example, see How to: Use combinable to Improve Performance. For more information about the combinable class, see Parallel Containers and Objects.

[Top]

False sharing occurs when multiple concurrent tasks that are running on separate processors write to variables that are located on the same cache line. When one task writes to one of the variables, the cache line for both variables is invalidated. Each processor must reload the cache line every time that the cache line is invalidated. Therefore, false sharing can cause decreased performance in your application.

The following basic example shows two concurrent tasks that each increment a shared counter variable.

volatile long count = 0L;
concurrency::parallel_invoke(
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   },
   [&count] {
      for(int i = 0; i < 100000000; ++i)
         InterlockedIncrement(&count);
   }
);

To eliminate the sharing of data between the two tasks, you can modify the example to use two counter variables. This example computes the final counter value after the tasks finish. However, this example illustrates false sharing because the variables count1 and count2 are likely to be located on the same cache line.

long count1 = 0L;
long count2 = 0L;
concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

One way to eliminate false sharing is to make sure that the counter variables are on separate cache lines. The following example aligns the variables count1 and count2 on 64-byte boundaries.

__declspec(align(64)) long count1 = 0L;      
__declspec(align(64)) long count2 = 0L;      
concurrency::parallel_invoke(
   [&count1] {
      for(int i = 0; i < 100000000; ++i)
         ++count1;
   },
   [&count2] {
      for(int i = 0; i < 100000000; ++i)
         ++count2;
   }
);
long count = count1 + count2;

This example assumes that the size of the memory cache is 64 or fewer bytes.

We recommend that you use the concurrency::combinable class when you must share data among tasks. The combinable class creates thread-local variables in such a way that false sharing is less likely. For more information about the combinable class, see Parallel Containers and Objects.

[Top]

When you provide a lambda expression to a task group or parallel algorithm, the capture clause specifies whether the body of the lambda expression accesses variables in the enclosing scope by value or by reference. When you pass variables to a lambda expression by reference, you must guarantee that the lifetime of that variable persists until the task finishes.

Consider the following example that defines the object class and the perform_action function. The perform_action function creates an object variable and performs some action on that variable asynchronously. Because the task is not guaranteed to finish before the perform_action function returns, the program will crash or exhibit unspecified behavior if the object variable is destroyed when the task is running.

// lambda-lifetime.cpp 
// compile with: /c /EHsc
#include <ppl.h>

using namespace concurrency;

// A type that performs an action. 
class object
{
public:
   void action() const
   {
      // TODO: Details omitted for brevity.
   }
};

// Performs an action asynchronously. 
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on  
   // that variable asynchronously.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // NOTE: The object variable is destroyed here. The program 
   // will crash or exhibit unspecified behavior if the task 
   // is still running when this function returns.
}

Depending on the requirements of your application, you can use one of the following techniques to guarantee that variables remain valid throughout the lifetime of every task.

The following example passes the object variable by value to the task. Therefore, the task operates on its own copy of the variable.

// Performs an action asynchronously. 
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on  
   // that variable asynchronously.
   object obj;
   tasks.run([obj] {
      obj.action();
   });
}

Because the object variable is passed by value, any state changes that occur to this variable do not appear in the original copy.

The following example uses the concurrency::task_group::wait method to make sure that the task finishes before the perform_action function returns.

// Performs an action. 
void perform_action(task_group& tasks)
{
   // Create an object variable and perform some action on  
   // that variable.
   object obj;
   tasks.run([&obj] {
      obj.action();
   });

   // Wait for the task to finish. 
   tasks.wait();
}

Because the task now finishes before the function returns, the perform_action function no longer behaves asynchronously.

The following example modifies the perform_action function to take a reference to the object variable. The caller must guarantee that the lifetime of the object variable is valid until the task finishes.

// Performs an action asynchronously. 
void perform_action(object& obj, task_group& tasks)
{
   // Perform some action on the object variable.
   tasks.run([&obj] {
      obj.action();
   });
}

You can also use a pointer to control the lifetime of an object that you pass to a task group or parallel algorithm.

For more information about lambda expressions, see Lambda Expressions in C++.

[Top]

Show:
© 2014 Microsoft