Export (0) Print
Expand All

6: Dynamic Task Parallelism

The Dynamic Task Parallelism pattern is applicable to problems that are solved by first solving smaller, related problems. For example, when you count the number of nodes in a data structure that represents a binary tree, you can count the nodes in the left and right subtrees and then add the results. A sequential algorithm that uses recursion can easily be transformed into a computation that uses dynamic task parallelism.

Dynamic task parallelism is also known as recursive decomposition or "divide and conquer."

Dynamic task parallelism is similar to recursion. Tasks create subtasks on the fly to solve subproblems as needed.

Applications that use data structures such as trees and graphs are typical examples of where you can use dynamic task parallelism. It's also used for applications that have geographic or geometric aspects, where the problem can be partitioned spatially. Dynamic task parallelism differs from the techniques that have been presented so far in this book. It is distinguished by the fact that tasks are added to the work queue as the computation proceeds.

The Basics

The following code shows a binary tree.

template<typename T>
struct TreeNode
{
private:
  T m_data;
  shared_ptr<TreeNode<T>> m_left;
  shared_ptr<TreeNode<T>> m_right;
  // ...
}

If you want to perform an action on each data value in the tree, you need to visit each node. This is known as walking the tree, which is a naturally recursive operation. Here's an example that uses sequential code.

template<typename Func>
static void SequentialWalk(shared_ptr<TreeNode<T>> node,
                           Func action)
{
  if (nullptr == node) return;

  action(node->Data());
  SequentialWalk(node->Left(), action);
  SequentialWalk(node->Right(), action);
}

The SequentialWalk applies the function action to each node in the tree in depth-first order. You can also use parallel tasks to walk the tree. This is shown in the following code.

template<typename Func>
static void ParallelWalk(shared_ptr<TreeNode<T>> node, 
                         Func action)
{
  if (nullptr == node) return;

  parallel_invoke(
    [&node, &action] { action(node->Data()); },
    [&node, &action] 
    { 
      Tree<T>::ParallelWalk(node->Left(), action); 
    },
    [&node, &action] 
    { 
      Tree<T>::ParallelWalk(node->Right(), action); 
    }
  );
}

When you use dynamic task parallelism to perform a tree walk, you no longer visit nodes in a predictable order. If you need to visit nodes in a sequential order, such as with a preorder, inorder, or postorder traversal, you may want to consider the Pipeline pattern that's described in Chapter 7, "Pipelines."

Gg663539.note(en-us,PandP.10).gifNote:
Dynamic task parallelism results in a less predictable order of execution than sequential recursion.

In this example, the number of tasks is three times the number of nodes in the tree. In an actual scenario, the number of nodes could be very large. The Parallel Pattern Library (PPL) is designed to handle this situation, but you may want to read the section, "Design Notes," later in this chapter for some performance tips.

An Example

An example of dynamic task parallelism is when you sort a list with an algorithm such as QuickSort. This algorithm first divides an unsorted array of values into sublists, and then it orders and recombines the pieces. Here's a sequential implementation.

Sorting is a typical application that can benefit from dynamic task parallelism.

static void SequentialQuickSort(VectorIter begin,
                                VectorIter end, 
                                long threshold)
{
  if (distance(begin, end) <= threshold) 
  {
    InsertionSort(begin, end);
  }
  else
  {
    VectorIter pivot = partition(begin + 1, 
                                 end,
                                 bind2nd(less<int>(), *begin));
    iter_swap(begin, pivot-1);
    SequentialQuickSort(begin, pivot - 1, threshold);
    SequentialQuickSort(pivot, end, threshold);
  }
}

In this example, the VectorIter typedef expands to the vector<int>::iterator method. This method sorts a vector<int> instance in place, instead of returning a sorted array. The begin and end arguments identify the segment that will be sorted. The code includes an optimization. It's not efficient to use the recursive algorithm on short segments, so the method calls the non-recursive InsertionSort method on segments that are less than or equal to threshold, which is set in a global variable. This optimization applies equally to the sequential and parallel versions of the QuickSort algorithm.

Gg663539.note(en-us,PandP.10).gifNote:
This example uses iterator conventions from the Standard Template Library (STL).

If a segment is longer than threshold, the recursive algorithm is used. The std::partition method moves all the array elements that are not greater than the element at pivot to the segment that precedes pivot. It leaves the elements that are greater than pivot in the segment that follows pivot (pivot itself may be moved). Then, the method recursively calls SequentialQuickSort on both segments.

The following code shows a parallel implementation of the QuickSort algorithm.

static void ParallelQuickSort(VectorIter begin, VectorIter end,
                              long threshold, int depthRemaining)
{
  if (distance(begin, end) <= threshold)
  {
    InsertionSort(begin, end);
  }
  else
  {
    VectorIter pivot = partition(begin + 1, 
                                 end, 
                                 bind2nd(less<int>(), *begin));
    iter_swap(begin, pivot-1);
    if (depthRemaining > 0)
    {
      parallel_invoke(
        [begin, end, pivot, depthRemaining, threshold] { 
          Sort::ParallelQuickSort(begin, pivot - 1,
                                  depthRemaining - 1, threshold);
        },
        [&pivot, begin, end, depthRemaining, threshold] { 
          Sort::ParallelQuickSort(pivot, end,
                                  depthRemaining - 1, threshold);
        }
      );
    }
    else
    {
      SequentialQuickSort(begin, pivot - 1, threshold);
      SequentialQuickSort(pivot, end, threshold);
    }
  }
}

The parallel version uses parallel_invoke to execute the recursive calls in tasks that can run in parallel. Tasks are created dynamically with each recursive call; if the array is large, many tasks might be created.

The parallel version includes an additional optimization besides using insertion sort for subsequences of small size. It's generally not useful to create many more tasks than there are processors to run them. So, the ParallelQuickSort method includes an additional argument to limit task creation. The depthRemaining argument is decremented on each recursive call, and tasks are created only when this argument exceeds zero. The following code shows how to calculate an appropriate depth (that is, the depthRemaining argument) from the number of processors.

static void ParallelQuickSort(vector<int>& a, long threshold)
{
  const int maxTasks = 
    CurrentScheduler::Get()->GetNumberOfVirtualProcessors();

  ParallelQuickSort(a.begin(), a.end(),
    (int)LogN(float(maxTasks), 2.0f) + 4, threshold);
}

One relevant factor in selecting the number of tasks is how similar the predicted run times of the tasks will be. In the case of QuickSort, the duration of the tasks may vary a great deal because the pivot points depend on the unsorted data. Using arbitrary, unsorted pivots produces segments of unequal size (in fact, the sizes can vary widely). The processing time required to sort each segment depends on the segment's size; therefore, you can expect tasks that are created by using pivots to divide segments to be of uneven duration. To compensate for the uneven durations of the tasks, the formula that calculates the depthRemaining argument produces a starting value that will allow more tasks to be created than the number of cores. The formula limits the number of tasks to approximately sixteen times the number of cores. This is because the number of tasks can be no larger than 2 ^ depthRemaining. If you substitute depthRemaining = log2(NCores) + 4 and simplify the expression, you see that the number of tasks is 16 x NCores. (Recall that for any value a, 2 ^ (a + 4) is the same as 16 times 2^a and that if a = log2(b), 2^a = b.)

For other algorithms you might want to use a depthRemaining value of 2 or 3, which would correspond to a limit on the number of tasks to 4 x NCores and 8 x NCores respectively. The number of tasks you choose depends on how unequal in duration you expect your tasks to be. The more variability in task durations, the more tasks you will probably want.

Gg663539.note(en-us,PandP.10).gifNote:
Limiting the number of subtasks by measuring the recursion depth is an extremely important technique for ensuring that an appropriate amount of potential parallelism will be introduced. Too many tasks could introduce task-related overhead; too few would result in underutilization of available cores.


Gg663539.note(en-us,PandP.10).gifNote:
The QuickSort example that is shown in this section was selected to illustrate the principles of dynamic task parallelism. As a sorting algorithm it may or may not be what you want. There are other examples of parallel sorting algorithms in the ConcRT Extras sample pack that may be better suited to your application.

Variations

Dynamic task parallelism has several variations.

Parallel While-Not-Empty

The examples shown so far in this chapter use techniques that are the parallel analogs of sequential depth-first traversal. There are also parallel algorithms for other types of traversals. These techniques rely on concurrent collections to keep track of the remaining work to be done. Here's an example.

template<typename T, typename Func>
void ParallelWhileNotEmpty1(
  vector<shared_ptr<TreeNode<T>>> initialValues, 
  Func body)
{
  concurrent_vector<shared_ptr<TreeNode<T>>>
    from(initialValues.size());
  for (size_t i = 0; i < initialValues.size(); i++)
    from[i] = initialValues[i];

  while(!from.empty())
  {
    concurrent_vector<shared_ptr<TreeNode<T>>> to;
    function<void (shared_ptr<TreeNode<T>>)> addMethod = 
        [&to](shared_ptr<TreeNode<T>> n) { to.push_back(n); };
    parallel_for_each(from.cbegin(), from.cend(), 
      [&body, &addMethod](shared_ptr<TreeNode<T>> item) 
      { 
        body(item, addMethod); 
      }
    );
    from = to;
  }
}

The ParallelWhileNotEmpty1 method shows how you can use parallel_for_each to process a collection of values that may grow over time. While the ParallelWhileNotEmpty1 method processes the initial values, additional values to process may be discovered. The additional values are placed in the to queue. After the first batch of values is processed, the method starts processing the additional values, which may again result in more values to process. This processing repeats until no additional values are produced.

The concurrent_vector class is provided by the Concurrency Runtime as a concurrency-safe implementation of a vector type.

The ParallelWalkWithWhileNotEmpty1 method uses the ParallelWhileNotEmpty1 method to walk a binary tree. This is shown in the following code example.

template<typename T, typename Func>
void ParallelWalkWithWhileNotEmpty1(shared_ptr<TreeNode<T>> node,
                                    Func action)
{
  if (nullptr == node)
    return;
  vector<shared_ptr<TreeNode<T>>> nodes;
  nodes.push_back(node);

  ParallelWhileNotEmpty1(nodes, 
    /* Func body */ [&action](shared_ptr<TreeNode<T>> item,
    function<void (shared_ptr<TreeNode<T>>)> addMethod)
    {
      if (nullptr != item->Left()) addMethod(item->Left());
      if (nullptr != item->Right()) addMethod(item->Right());
      action(item->Data());
    });
} 

A website tool that checks links is an example of an appropriate place to use the ParallelWalkWithWhileNotEmpty1 method. The tool loads the initial page and searches it for links. Each link is checked and removed from the list, and additional links to unchecked pages from the same site are added to the list. Eventually, there are no more unchecked links and the application stops.

Adding Tasks to a Pending Wait Context

In most cases you invoke the wait method of a task group only after all of the tasks have been created. In some cases, it is useful to create new tasks after the wait method has been invoked but before the previously created tasks have finished. A typical example arises when you are traversing nodes of a graph.

Here is an example of a function that uses parallel tasks of a single task group to traverse a tree.

template<typename T, typename Func>
void ParallelSubtreeHandler(task_group& tg, 
                            shared_ptr<TreeNode<T>> node, 
                            Func action) 
{
  while (nullptr != node)
  {
    // Start up processing the left subtree in a new task
    if (nullptr != node->Left())
    {
      tg.run([&tg, node, action]() { 
        ParallelSubtreeHandler(tg, node->Left(), action); 
      });
    }

    // Process this node
    tg.run([node, action]() { action(node->Data()); });

    // Walk down the right side of the tree
    node = node->Right();
  }
}

The ParallelSubtreeHandler is called from the top-level function that is shown below.

template<typename T, typename Func>
void ParallelTreeUnwinding(shared_ptr<TreeNode<T>> node, 
                           Func action)
{
  if (nullptr == node)
    return;

  task_group tg; 

  ParallelSubtreeHandler(tg, node, action);

  tg.wait();
}

The ParallelTreeUnwinding function creates a single task group that is used by the code that handles the subtrees. Here is a code example that shows how the function is called. The lambda expression simply records all the nodes that are visited.

  const Tree<int>& tree = ...
  concurrent_vector<int> result;
    
  ParallelTreeUnwinding(tree.Root(),
    [&result](int itemData)
    {
      DoCpuIntensiveOperation(Time);
      result.push_back(itemData);
    });

Dynamically adding new tasks to the task group allows you to use the task_group object to track unprocessed nodes instead of using a concurrent_vector, as was done in the ParallelWhileNotEmpty1 code example. The task_group also makes the code easier to read because it eliminates the need for a separate data structure to hold unprocessed nodes. Rather than completing when all unprocessed nodes have been removed from the concurrent_vector, this example completes when the task_group contains no more incomplete tasks.

Exercises

  1. The sample code on CodePlex assigns a particular default value for the threshold segment length. At this point, the QuickSort methods switch to the non-recursive InsertionSort algorithm. Use the command line argument to assign different values for the threshold value, and then observe the execution times for the sequential version to sort different array sizes. What do you expect to see? What's the best value for threshold on your system?
  2. Use the command line argument to vary the array size, and then observe the execution time as a function of array size for the sequential and parallel versions. What do you expect? Can you explain your observations?
  3. Suggest other measures, besides the number of cores, to limit the number of tasks.

Further Reading

Heineman et al. discuss additional variations on QuickSort and other sorting algorithms.

Heineman, George T., Gary Pollice, and Stanley Selkow. Algorithms in a Nutshell. O'Reilly Media, 2008.


Last built: March 9, 2012

Show:
© 2014 Microsoft