# 4: Parallel Aggregation

Chapter 2, "Parallel Loops," shows how to use parallel techniques that apply the same independent operation to many input values. However, not all parallel loops have loop bodies that execute independently. For example, a sequential loop that calculates a sum does not have independent steps. All the steps accumulate their results in a single variable that represents the sum calculated up to that point. This accumulated value is an aggregation. If you were to convert the sequential loop to a parallel loop without making any other changes, your code would fail to produce the expected result. Parallel reads and writes of the single variable would corrupt its state.

Nonetheless, there is a way for an aggregation operation to use a parallel loop. This is the Parallel Aggregation pattern.

The Parallel Aggregation pattern lets you use multiple cores to calculate sums and other types of accumulations that are based on associative operators.

Although calculating a sum is an example of aggregation, the pattern is more general than that. It works for any binary operation that is associative. However, some implementations of the Parallel Aggregation pattern also expect the operations to be commutative.

The Parallel Aggregation pattern uses unshared, local variables that are merged at the end of the computation to give the final result. Using unshared, local variables for partial, locally calculated results makes the steps of a loop independent of each other. Parallel aggregation demonstrates the principle that it's usually better to make changes to your algorithm than to add synchronization primitives to an existing algorithm. For more information about the algorithmic aspects of this pattern, see the "Design Notes" section later in this chapter.

The Parallel Aggregation pattern is also known as the Parallel Reduction pattern because it combines multiple inputs into a single output.

# The Basics

The most familiar application of aggregation is calculating a sum. Here's a sequential version.

vector<int> sequence = ...
int count = 0;
for (size_t i = 0; i < sequence.size(); i++)
count += IsPrime(sequence[i]) ? 1 : 0;
return count;

This is a typical sequential for loop. In this example and the ones that follow, IsPrime is a user-provided function that determines if its argument is a prime number. The result is a count of how many prime numbers are contained in the input sequence. (Of course, you could also have used the Standard Template Library (STL) count_if operation in this particular example.)

How can sequential accumulation be adapted for parallel processing? As was explained in Chapter 2, simply swapping the for operator with parallel_for won’t work because the count variable is shared by all iterations. You might also be tempted to wrap a critical section around the operation that increments the count variable. The critical section would prevent parallel iterations from performing conflicting reads and writes, but the performance of that approach would be much, much worse than the sequential version you are trying to optimize. The cost of synchronization would be prohibitive. (In fact, programmers often underestimate the performance cost of synchronization operations.)

Typical of many situations in parallel programming, the answer is not to apply synchronization operations to the existing sequential algorithm in order to make it "safe" for parallel execution. Instead, redesign the algorithm to use a two-phase approach. First, subdivide the problem into as many tasks as you have cores and calculate partial results locally on a per-core basis. Then, once all of the per-task partial results are ready, sequentially merge the results into one final accumulated value. The process of combining partial reductions is graphically illustrated by the cartoon illustration on this chapter’s facing page.

PPL provides a special data structure that makes it easy to create per-task local results in parallel and merge them as a final sequential step. This data structure is the combinable class. The following code examples show how to use the combinable class to implement the Parallel Aggregation pattern.

The combinable class makes it easy to create per-task local results in parallel and merge them as a final sequential step.

vector<int> sequence = ...

combinable<int> count([]() { return 0; });

parallel_for_each(sequence.cbegin(), sequence.cend(),
[&count](int i)
{
count.local() += IsPrime(i) ? 1 : 0;
});
return count.combine(plus<int>());

The count variable is a combinable object that provides thread-private values. To compute the initial, local values the constructor of the combinable class takes a function as an argument.

Note:
The combinable class assumes that the operation provided as an argument to the combine method is commutative.

Next, a parallel_for_each loop creates multiple tasks (typically, equal to some multiple of the number of cores on your computer) and runs the loop body function in parallel. The tasks collect the partial, per-core results into per-task variables that are provided by the combinable object’s local method.

The number of tasks depends on the level of concurrency available in the current context. See Appendix A, "The Task Scheduler and Resource Manager" for more information about runtime policy settings for concurrency. Also, the parallel_for_each loop uses dynamic range stealing to equalize the amount of work among its internal worker threads.

After the parallel_for_each loop completes, the combinable object’s combine method applies a user-specified binary operation to aggregate the values of each of the per-task partial results. In this example the combination function is integer addition. The return value of the combine method is the final aggregated value.

The Concurrency Runtime sample pack provides several STL-style parallel aggregation functions. The easiest way to understand how these functions work is to compare them with their corresponding sequential operations in STL.

STL provides a very simple way to express sequential aggregation with iterators. Here is an example.

vector<int> sequence = ...
return accumulate(sequence.cbegin(), sequence.cend(), 0,
IncrementIfPrime());

The STL accumulate function applies a binary function to an internal accumulation variable and to each element of a sequence, updating the accumulation variable with each iteration. The first and second arguments to the accumulate function give the iteration bounds. The third argument is the initial value of the accumulation variable, and the fourth argument is a binary reduction function that will be successively applied to the accumulation variable and to each iterated value. The job of the reduction function is to combine two input values. Here is the implementation of the reduction function, IncrementIfPrime.

struct IncrementIfPrime
{
int operator()(int total, int element) const
{
}
};
Note:
If the conventions of STL algorithms are unfamiliar to you, you should brush up on them before reading this chapter. See the "Further Reading" section for more information.

The STL accumulate function is a sequential operation whose performance is comparable to the sequential for loop shown in the earlier example. To convert an STL accumulate expression into a parallel aggregation you can use the parallel_reduce function of the Concurrency Runtime sample pack. The following code gives an example.

using namespace ::Concurrency::samples;
vector<int> sequence = ...
return parallel_reduce(sequence.cbegin(), sequence.cend(), 0,
CountPrimes(), plus<int>());

The parallel_reduce function takes five arguments. The first two arguments give the iteration bounds. The third argument gives the value of the reduction’s identity element. If the reduction is based on addition, this element will be 0. For multiplicative reduction, the identity element is 1. For reductions such as aggregate set union, the identity element is the empty set.

The fourth argument is a function object that can be applied on a subrange of an iterator to produce a local partial aggregation. This example uses a functor created by instantiating the CountPrimes class. The return value of the function object is the local partial result from the first phase of the Parallel Aggregation pattern.

The fifth argument is a reduction function that will combine the partial results that have been calculated for each of the subranges.

Here is the implementation of the CountPrimes class-type functor.

struct CountPrimes
{
int operator()(vector<int>::const_iterator begin,
vector<int>::const_iterator end,
int right) const
{
return right + accumulate(begin, end, 0, IncrementIfPrime());
}
};

The parallel_reduce function divides the input iterator into ranges. There will be enough ranges to compensate for the effects of uneven workloads, but not so many ranges that the overhead of calculating them dominates the computation. PPL determines how many ranges to create.

In this example, the CountPrimes function object will be invoked one time for each of the ranges. It executes a sequential accumulation operation on the subrange and collects the result.

The parallel_reduce function is usually the recommended approach whenever you need to apply the Parallel Aggregation pattern within applications that use PPL. Its declarative nature makes it less prone to error than other approaches, and its performance on multicore computers is competitive with them. Implementing parallel aggregation with parallel_reduce doesn't require adding locks in your code. Instead, all the synchronization occurs internally. Of course, if parallel_reduce doesn't meet your needs or if you prefer a less declarative style of coding, you can also use the combinable class with parallel_for or parallel_for_each to implement the parallel aggregation.

You should be aware that parallel_for and parallel_for_each add overhead due to their support of features such as cancellation and dynamic range stealing. Also, a call to the combinable::local() method inside of a parallel loop adds the cost of a hash table lookup to each iteration of the loop. In general, use parallel aggregation to increase performance when iterations perform complex computations.

# An Example

Aggregation doesn't only apply to numeric values. It arises in many other application contexts. The following example shows how to use a variation of parallel aggregation known as map/reduce to aggregate nonscalar data types.

The example is of a social network service, where subscribers can designate other subscribers as friends. The site recommends new friends to each subscriber by identifying other subscribers who are friends of friends. To limit the number of recommendations, the service only recommends the candidates who have the largest number of mutual friends. Candidates can be identified in independent parallel operations, and then candidates are ranked and selected in an aggregation operation.

Here's how the data structures and algorithms that are used by the recommendation service work. Subscribers are identified by integer ID numbers. A subscriber's friends are represented by the collection of their IDs. The collection is a set because each element (a friend's ID number) occurs only once and the order of the elements doesn't matter. For example, the subscriber whose ID is 0 has two friends whose IDs are 1 and 2. This can be written as:

0 -> { 1, 2 }

The social network repository stores an entry like this for every subscriber. In order to recommend friends to a subscriber, the recommendation service must consider a subscriber's entry, as well as the entries for all of that subscriber's friends. For example, to recommend friends for subscriber 0, the pertinent entries in the repository are:

0 -> { 1, 2 }

1 -> { 0, 2, 3 }

2 -> { 0, 1, 3, 4 }

You can see that the service should recommend subscribers 3 and 4 to subscriber 0 because they appear among the friends of subscribers 1 and 2, who are already friends of 0. In addition, the recommendation service should rank subscriber 3 higher than 4, because 3 is a friend of both of 0's friends, while 4 is a friend of only one of them. You can write the results like this:

{ 3(2), 4(1) }

This means that subscriber 3 shares two mutual friends with subscriber 0, and subscriber 4 shares one. This is an example of a type of collection known as a multiset. In a multiset, each element (3 and 4 in this example) is associated with a multiplicity, which is the number of times it occurs in the collection (2 and 1, respectively). So a multiset is a collection where each element only occurs once, yet it can represent duplicates (or larger multiplicities). The order of elements in a multiset doesn't matter.

Note:
The example in this section uses a multiset implementation that differs from STL.

The recommendation service uses map/reduce and has three phases.

In the first phase, which is the map phase, the service creates collections of friend candidates. The collections of potential friends are calculated by iterating through the subscriber’s friends and searching their friends for people that are not currently friends of the subscriber.

In the second phase, which is the reduce phase, the service aggregates the sets of potential friends into a multiset where each candidate's ID is associated with its multiplicity (the number of mutual friends). For each set of possible friends, the reduce phase merges the sets of potential friends and maintains a count of the occurrences. It uses a hash_map<FriendID, int> instance for this purpose.

The final phase performs postprocessing. The service ranks candidates by sorting them according to their multiplicity and selects only the candidates with the largest multiplicities.

An important feature of map/reduce is that the result of the map stage is a collection of items that is compatible with the reduce stage. The reduce stage uses multisets; therefore, the map stage does not produce only a list of candidate IDs; instead, it produces a vector of multisets, where each multiset contains only one candidate with a multiplicity of one. In this example, the output of the map stage is a collection of two multisets. The subscribers are the numbers 3 and 4.

{ 3(1) }, { 3(1) , 4(1) }

Here, the first multiset contains friends of subscriber 1, and the second multiset contains friends of subscriber 2.

Another important feature of map/reduce is that the aggregation in the reduce phase is performed by applying a binary operation to pairs of elements from the collection that is produced by the map phase. In this example, the operation is a multiset union, which combines two multisets by collecting the elements and adding their multiplicities. The result of applying the multiset union operation to the two multisets in the preceding collection is:

{ 3(2), 4(1) }

Now that there is only one multiset, the reduce phase is complete. By repeatedly applying the multiset union operation, the reduce phase can aggregate any collection of multisets, no matter how large, into one multiset.

This is the code that defines the main data types that are used in the sample.

typedef int SubscriberID;
typedef int FriendID;
typedef set<FriendID> FriendsSet;
typedef shared_ptr<FriendsSet> FriendsSetPtr;
typedef hash_map<SubscriberID, FriendsSetPtr> SubscriberMap;

class FriendMultiSet : public hash_map<FriendID, int>
{
// Multiset of potential friends.
// ...
}

typedef shared_ptr<FriendMultiSet> FriendMultiSetPtr;

The FriendsSet type is implemented by an STL set. The FriendMultiSet type has a custom implementation. In addition to these data types, the sample also uses an ordered list of potential friends that is sorted by multiplicity in decreasing order. Here is the code.

struct LessMultisetItem
{
bool operator()(const pair<FriendID, int> value1,
const pair<FriendID, int> value2) const
{
return (value1.second == value2.second) ?
(value1.first > value2.first) :
(value1.second > value2.second);
}
};

typedef public set<pair<FriendID, int>, LessMultisetItem>
FriendOrderedMultiSet;

Note that STL also implements a std::multiset type, but it is used to store sets which contain multiple key values with equal values, rather than key/value pairs.

Finally, here is the code for the sequential version of the algorithm that suggests potential friends.

FriendOrderedMultiSet
PotentialFriendsSequentialTransform(
const SubscriberMap& subscribers,
SubscriberID id,
int maxCandidates)
{
// Map:

FriendsSetPtr friends = subscribers.find(id)->second;
vector<FriendMultiSetPtr> friendsOfFriends(friends->size());

transform(friends->cbegin(),friends->cend(),
friendsOfFriends.begin(),
[&subscribers,&friends,&id](int friendID)->FriendMultiSetPtr
{
FriendsSetPtr theirFriends =
subscribers.find(friendID)->second;
FriendsSetPtr friendsOfFriend = make_shared<FriendsSet>();

set_difference(theirFriends->cbegin(),
theirFriends->cend(),
friends->cbegin(),friends->cend(),
inserter(*friendsOfFriend, friendsOfFriend->end()));
friendsOfFriend->erase(id);

return FriendMultiSetPtr(
new FriendMultiSet(friendsOfFriend));
});

// Reduce:

// The reduction does not use std:accumulate because
// this results in too much copying of intermediate
// FriendCountMap
FriendMultiSet candidates;
for_each(friendsOfFriends.cbegin(), friendsOfFriends.cend(),
[&candidates](FriendMultiSetPtr set)
{
candidates.Union(set);
});

// Postprocess:

return candidates.MostNumerous(maxCandidates);
}

In the map phase, this code loops sequentially over the subscriber's friends and builds a collection of multisets of candidates. In the reduce phase, the code loops sequentially over those multisets and aggregates them with the multiset union operation, which is implemented by the Union method. If this code executes with the few subscribers in the example, the id argument is 0 and subscribers.find(id)->second returns { 1, 2}. When the map phase completes, the friendsOfFriend variable contains { 3(1) }, { 3(1) , 4(1) }. When the reduce phase completes, candidates contains { 3(2), 4(1) }.

Multiset union is associative; if you aggregate several multisets into one by successively forming unions in a pair-wise manner, the final result does not depend on the order of the union operations. Multiset union is also commutative; the result does not depend on the order of its arguments. If the aggregation function is not associative, it can't be done in parallel without potentially getting different results. If it's not commutative, the potential for parallelism is greatly reduced.

Note:
Strictly speaking, floating-point arithmetic is neither commutative nor associative. From run to run, parallel computations over floats or doubles may end up with slightly different results due to rounding errors.

Here's how to use the parallel_transform and parallel_reduce functions of the Concurrency Runtime sample pack to apply map/reduce to the social networking example.

FriendOrderedMultiSet
PotentialFriendsParallel(const SubscriberMap& subscribers,
SubscriberID id,
int maxCandidates)
{
// Map:

FriendsSetPtr friends = subscribers.find(id)->second;
vector<FriendMultiSetPtr> friendsOfFriends(friends->size());

parallel_transform(friends->cbegin(),friends->cend(),
friendsOfFriends.begin(),
[&subscribers,&friends,&id](int friendID)->FriendMultiSetPtr
{
FriendsSetPtr theirFriends =
subscribers.find(id)->second;
FriendsSetPtr friendsOfFriend = make_shared<FriendsSet>();

set_difference(
theirFriends->cbegin(), theirFriends->cend(),
friends->cbegin(), friends->cend(),
inserter(*friendsOfFriend, friendsOfFriend->end()));
friendsOfFriend->erase(id);

return FriendMultiSetPtr(
new FriendMultiSet(friendsOfFriend));
});

// Reduce:

FriendMultiSet candidates;
candidates =
parallel_reduce(friendsOfFriends.cbegin(),
friendsOfFriends.cend(),
FriendMultiSet(),
[](vector<FriendMultiSetPtr>::const_iterator cbegin,
vector<FriendMultiSetPtr>::const_iterator cend,
const FriendMultiSet& right)
{
return right.Union(cbegin, cend);
},
[](const FriendMultiSet& left, const FriendMultiSet& right)
{
return left.Union(right);
});

// Postprocess:

return candidates.MostNumerous(maxCandidates);
}

Recall that in map/reduce, independent parallel operations (the map phase) are followed by aggregation (the reduce phase). In the map phase, the parallel operations iterate over all the friends of subscriber 0. The map phase is performed by the parallel_transform function, which finds all the friends of each friend of the subscriber. The set_difference function prevents redundant recommendations by removing the subscriber. The output of the map phase is a vector of multisets for each of the subscriber’s friends.

The reduce phase is performed by the call to the parallel_reduce function, which counts the duplicate candidate IDs. Note that the call to the FriendMultiSet function returns an empty multiset that is used as the identity element. The Union method combines two multisets.

The return statement performs the final postprocessing step that selects the candidates with the highest multiplicities.

# Variations

This section contains some common variations of the Parallel Aggregation pattern.

## Considerations for Small Loop Bodies

If the body of your parallel aggregation loop performs very little work, you may find that performing parallel aggregation takes longer than sequential aggregation. When you have small loop bodies, you can apply the techniques that were described in Chapter 3, "Parallel Loops" in the "Special Handling of Small Loop Bodies" section. These techniques allow you to use sequential aggregation within subranges.

## Other Uses for Combinable Objects

The combinable class is most commonly used to implement the Parallel Aggregation pattern, but you do not necessarily need to use combinable objects for aggregation. You can also use combinable instances to create thread-local variables when a thread starts.

# Design Notes

If you compare the sequential and parallel versions of the Parallel Aggregation pattern, you see that the design of the parallel version includes an additional step in the algorithm that merges partial results. Figure 1 illustrates the two phases of parallel aggregation.

Figure 1

Parallel aggregation

Figure 1 shows that instead of placing the accumulation in a single, shared result, the parallel loop uses unshared local storage for partial results (these are named subtotals in Figure 1). The local method of the combinable class provides access to the unshared storage for each thread. Each worker thread processes a single partition of the input values. The number of partitions depends on the degree of parallelism that's needed to efficiently use the computer's available cores. After all of the partial results have been computed, the combine function of the combinable object merges the local results into the final, global result.

The reason that this approach is fast is that there is very little need for synchronization operations. Calculating the per-task local results uses no shared variables, and therefore requires no locks. The combine operation is a separate sequential step and also does not require locks.

This discussion shows that the Parallel Aggregation pattern is a good example of why changes to your algorithm are often needed when moving from a sequential to a parallel approach.

You can't simply add locks and expect to get good performance. You also need to think about the algorithm.

To make this point clear, here's an example of what parallel aggregation would look like if you simply added locks to the existing sequential algorithm. To do this, you only need to convert sequential for to parallel_for and add one lock statement.

// WARNING: BUGGY CODE. Do not copy this example.
// It will run *much slower* than the sequential version.
// It is included to show what *not* to do.

vector<int> sequence = ...
CRITICAL_SECTION cs;
InitializeCriticalSectionAndSpinCount(&cs, 0x80000400);
int count = 0;

// BUG -- Do not use parallel_for_each in this case
parallel_for_each(sequence.cbegin(), sequence.cend(),
[&count, &cs](int i)
{
// BUG -- Do not use locking inside of a parallel aggregation
EnterCriticalSection(&cs);
// BUG -- Do not use shared variable for parallel aggregation
count += IsPrime(i) ? 1 : 0;
LeaveCriticalSection(&cs);
});

return count;

If you forget to enter and exit the critical section, this code fails to calculate the correct sum on a multicore computer. Adding the synchronization code makes this example correct with respect to serialization. If you run this code, it produces the expected sum. However, it fails completely as an optimization. This code is many times slower than the sequential version it attempted to optimize! The reason for the poor performance is the cost of synchronization.

In contrast, the examples of the Parallel Aggregation pattern that you have seen elsewhere in this chapter will run much faster on multicore computers than the sequential equivalent, and their performance also improves in approximate proportion to the number of cores.

It might at first seem counterintuitive that adding additional steps can make an algorithm perform better, but it's true. If you introduce extra work, and that work has the effect of preventing data dependencies between parallel tasks, you often benefit in terms of performance.

# Related Patterns

There's a group of patterns related to summarizing data in a collection. Aggregation (also known as Reduce) is one of them. The others include Scan and Pack. The Scan pattern occurs when each iteration of a loop depends on data computed in the previous iteration. The Pack pattern uses a parallel loop to select elements to retain or discard. The result of a pack operation is a subset of the original input. These patterns can be combined, as in the Fold and Scan pattern. For more information about these related patterns, see the section, "Further Reading," at the end of this chapter.

# Exercises

1. Consider the small social network example (with subscribers 0, 1, 2). What constraints exist in the data? How are these constraints observed in the sample code?
2. In the social network example, there's a separate postprocessing step where the multiset of candidates, which is an unordered collection, is transformed into a sequence that is sorted by the number of mutual friends, and then the top N candidates are selected. Could some or all of this postprocessing be incorporated into the reduction step?
3. In the standard reference on map/reduce (see the section, "Further Reading"), the map phase executes a map function that takes an input pair and produces a set of intermediate key/value pairs. All pairs for the same intermediate key are passed to the reduce phase. That reduce phase executes a reduce function that merges all the values for the same intermediate key to a possibly smaller set of values. The signatures of these functions can be expressed as: map (k1,v1) -> list(k2,v2) and reduce (k2,list(v2)) -> list(v2). In the social network example, what are the types of k1, v1, k2, and v2? What are the map and reduce functions?

Musser et al. explain the standard template library (STL). A thorough treatment of synchronization techniques appears in the book by Duffy. The related patterns of Stencil, Scan, and Pack are discussed by McCool. The standard reference on map/reduce is the paper by Dean and Ghemawat. Other cases of algorithms that use parallel loops with some dependencies between steps are described by Toub. These include fold-and-scan and dynamic programming. Toub’s examples are in managed code, but the algorithms apply equally to native code. The Wikipedia article describes the mathematical multisets that were used in code example in this chapter.

Dean, J., and S. Ghemawat. "MapReduce: Simplified Data Processing on Large Clusters." In OSDI '04: Sixth Symposium on Operating System Design and Implementation, 137–150, 2004.

Duffy, J., Concurrent Programming on Windows. Addison-Wesley, 2008.

McCool, M., "Structured Patterns: An Overview." December 2009.
http://www.ddj.com/go-parallel/article/showArticle.jhtml?articleID=223101515.

Musser, D. R., G. J. Derge, and A. Saini. STL Tutorial and Reference Guide: C++ Programming with the Standard Template Library, 3rd edition. Addison-Wesley Professional, December 2009.

Toub, S., "Patterns of Parallel Programming: Understanding and Applying Parallel Patterns with the .NET Framework 4." 2009.