Using Distributed Grouped Aggregation
Grouped Aggregation is a pattern that occurs when the output of a GroupBy operation is immediately followed by an Aggregate operation on each group. This pattern frequently occurs in distributed computing. For example, if you want to use a large census data set to find the average household income by postal code, you need grouped aggregation. The operation is sometimes called the GroupBy-Aggregate pattern or the Group-and-Aggregate pattern.
LINQ to HPC performs sophisticated optimization for distributed grouped aggregation even though the programming interface is high-level. LINQ to HPC automatically analyzes expression trees to create optimized query plans that decompose grouped aggregation operations into efficient, distributed computations.
Implementation approaches for grouped aggregation
There are several ways in which grouped aggregation can be accomplished. The following code uses hash partitioning to implement it.
string wordData = ...
var query =
context.FromDsc<string>(wordData)
.HashPartition(x => x)
.Apply(localRecords => LocalGroupBy(localRecords))
.Select(g => new KeyValuePair<string, long>(g.Key, g.LongCount()));
This code demonstrates how a word count problem might be solved by using distributed computing. An input file set with multiple DSC files contains a list of words, and the output is a file set that contains (word, count) pairs, with each word appearing just once in the output.
The first step uses a HashPartition operation to redistribute the data so that identical words appear on the same machine. The second step performs a local grouping operation on each computer. The results contain elements such as "dog: {dog, dog}", which represents a group whose key is "dog" and whose elements are {dog, dog}. Finally, the code aggregates the elements of each group by counting them. The final result is a sequence of (word, count) pairs.
Here is the definition of the LocalGroupBy operator.
[DistributiveOverConcat]
public static IEnumerable<IGrouping<string, string>>
LocalGroupBy(IEnumerable<string> localRecords)
{
return localRecords.GroupBy(x => x); // use LINQ-to-Objects query
}
Refer to the Unary Apply section for information about the [DistributiveOverConcat] attribute.
The following figure illustrates how this query executes when the data set includes the words "cat", "dog", "fish", and "goat".
Although this implementation produces the correct result, it can be expensive if the input file set is very large. It is quicker to decompose the counting operation into two phases, and to do the counting early, before redistributing any of the data. Here is the code for the improved version.
string wordData = ...
var query = context.FromDsc<string>(wordData)
.Apply(localRecords => LocalPartialReduce(localRecords))
.HashPartition(x => x.Key)
.Apply(localResults => LocalCombine(localResults));
The second word count implementation adds a step that performs partial reduction by using distributed computing in the first phase of the algorithm, before any data is copied across the cluster. Here is the definition of the LocalPartialReduce operator that initially reduces the input data.
[DistributiveOverConcat]
public static IEnumerable<KeyValuePair<string, long>>
LocalPartialReduce(IEnumerable<string> localRecords)
{
return localRecords.GroupBy(x => x)
.Select(g => new KeyValuePair<string, long>(
g.Key, g.LongCount()));
}
The query in the LocalPartialReduce method uses LINQ-to-Objects to perform local partial reduction on each of the compute nodes.
Here is the definition of the LocalCombine operator that combines the results of the initial reduction phase.
[DistributiveOverConcat]
public static IEnumerable<KeyValuePair<string, long>>
LocalCombine(IEnumerable<KeyValuePair<string, long>> localResults)
{
return localResults.GroupBy(x => x.Key)
.Select(g => new KeyValuePair<string, long>(
g.Key, g.Sum(x => x.Value)));
}
The query in the LocalCombine method uses LINQ-to-Objects to combine the partial results that were calculated earlier. The LocalCombine method is executed in a distributed manner on the compute nodes.
The following figure illustrates how the revised query works.
In the improved version, the initial count operation takes place in a distributed manner and does not require values to be copied with an expensive hash partitioning operation. After the group-and-aggregate operation is completed for each DSC file of the input file set, the partial results are distributed across the cluster, based on their key.
Next, a second GroupBy operation collects the partial results for each word key. A local aggregation operation combines the partial results for each word by adding the previously calculated counts.
You can see that the second version of the word-count example significantly improves performance for large data sets, as compared with the first version. Many fewer records are copied than in the example that performs all aggregation after the hash partitioning.
LINQ to HPC can automatically create a query plan that executes the optimized algorithm shown in the previous figure. In most cases, you do not need to do any additional coding. For example, here is a LINQ to HPC query that performs the same word-count processing as the previous two examples.
var query =
context.FromDsc<string>("MyWordList")
.GroupBy(x => x,
(key, seq) => new KeyValuePair<string, long>(key, seq.LongCount()));
The query plan that is generated for this query includes stages for the partial aggregation and combine operations.
You can also write the same query by using the following syntax.
var query =
context.FromDsc<string>(“MyWordList”)
.GroupBy(x => x)
.Select(g => new KeyValuePair<string, long>(g.Key, g.LongCount()));
LINQ to HPC recognizes that this query is an example of the group-and-aggregate pattern and automatically creates the optimized query plan that uses the partial reduction optimization.
To understand how this works, recall that LINQ queries are syntax trees. The lambda expressions in a query are passed by using the Microsoft® .NET Expression data type, which allows them to be traversed and analyzed. For more information, see Expression Trees on MSDN. LINQ to HPC includes sophisticated support for term rewriting, which is a mathematical technique that is used by compilers and other analysis software to transform abstract syntax trees.
LINQ to HPC can recognize that an expression such as new KeyValuePair<string, int>(g.Key, g.Count) can be decomposed into stages for local partial aggregation and the combination of partial results. LINQ to HPC knows that the Count operator can be decomposed into a local count operator whose intermediate results can be combined by using the Sum operator. Further, it knows that a constructor can be called during the final step, after the intermediate results have been combined.
The detailed query plan involves tracking all of the partial results as a tuple, and then recombining them when they are needed. LINQ to HPC supports many kinds of aggregation expressions. It can handle the following nested combinations:
-
Unary and binary operators
-
Ternary conditional expressions
-
Method calls that have parameters
-
Method calls that have an instance-object that results from a method call
-
New object expressions
-
New array expressions
-
List initialization expressions such as new List() { a, b, c, ...}
-
Property initialization expressions such as new obj() {A=...}
LINQ to HPC considers the following LINQ operators to be decomposable for distributed grouped aggregation. They may be used in any aggregation expression, as defined above.
-
Count, LongCount
-
First, Min, Max, Sum, Average
-
Any, All, Contains
-
Distinct
-
Aggregate
In general, the following two forms can be used to express the group-and-aggregate pattern in a way that LINQ to HPC can optimize.
-
GroupBy(
key-selector-expression
, (
key, seq
) =>
aggregation-expression
)
-
GroupBy(
key-selector-expression
).Select(
g
=>
aggregation-expression
)
The aggregation expression can be any expression that includes one or more decomposable aggregation functions, and that adheres to the context conditions for aggregation expressions that are described in this topic.
For example, if you want to implement your own average operator for grouped aggregation rather than use LINQ’s built-in Average operator, you could write the following query.
string intData = ...
var query =
context.FromDsc<long>(intData)
.GroupBy(x => x % 10,
(key, seq) =>
new KeyValuePair<long, double>(
key,
(double)seq.Sum / (double) seq.Count));
LINQ to HPC can create local partial results for the Sum and Count components of the query without copying any of the input records, and then perform the final division to calculate the average, when all of the intermediate results have been combined.
The value of the LINQ to HPC approach is that it greatly simplifies the programming model for grouped aggregation applications, while automatically placing the partial reduction operations into the optimized query plan.
The LINQ to HPC approach is in some ways analogous to the optimizations that a SQL query optimizer might apply to a SQL query plan. However, the expression trees in LINQ to HPC use strongly typed expressions of .NET languages, and the rewrite rules take into consideration the semantics of the host programming language, as well as the language-integrated query language.
Note |
|---|
| For more information about how LINQ to HPC handles grouped aggregation, see Distributed Aggregation for Data-Parallel Computing: Interfaces and Implementations. |
Using the [Decomposable] attribute for grouped aggregation
LINQ to HPC can integrate user-defined aggregation expressions into the optimizations it performs for distributed grouped aggregations. If you apply the [Decomposable] custom attribute to your own reduction method, LINQ to HPC can apply the distributed grouped aggregation optimizations to any kind of aggregation that you implement. A user-defined decomposable aggregation operator can be used anywhere that a decomposable built-in aggregation operator can be used.
The [Decomposable] attribute enables you to specify a user-provided class that provides methods that correspond to the three aspects of decomposition. The class you associate with the [Decomposable] attribute must implement either the IDecomposable interface or the IDecomposableRecursive interface.
Here are the definitions of the IDecomposable and IDecomposableRecursive interfaces.
public interface IDecomposable<TSource, TCombine, TReduce>
{
TCombine Combine(IEnumerable<TSource> source);
TReduce Reduce(IEnumerable<TCombine> combinedValues);
}
public interface IDecomposableRecursive<TSource, TCombine, TReduce>
{
TCombine Combine(IEnumerable<TSource> source);
TCombine RecursiveCombine(IEnumerable<TCombine> combinedValues);
TReduce Reduce(IEnumerable<TCombine> combinedValues);
}
Implement the IDecomposableRecursive interface if you want LINQ to HPC to perform aggregation by using a multiphase merge stage. In this case, an intermediate combination operator, which is defined by the RecursiveCombine method, may be repeatedly applied in order to reduce a tree of partial results prior to the final reduce phase (the Reduce method). Implement the IDecomposable interface if you want the local partial reductions to be collected and subsequently aggregated in a single, final reduction phase. In other words, you have the choice of a two-step algorithm (by using IDecomposable) or a multiphase algorithm (by using IDecomposableRecursive). The two-step approach is often slower than the multiphase approach.
Here are the methods of the IDecomposable and IDecomposableRecursive interfaces.
-
The Combine method performs the local partial reduction. The method is invoked in a distributed manner, one time for each DSC file in the input file set.
-
The RecursiveCombine method, which is present only in the IDecomposableRecursive interface, combines the local partial reductions into intermediate aggregated values of the same type. The LINQ to HPC runtime may invoke the RecursiveCombine method multiple times as part of a multiphase merge stage.
-
The Reduce method processes the results of a local partial reduction (in the case of the IDecomposable interface) or intermediate aggregated values (in the case of the IDEcomposableRecursive interface). The Reduce method returns a single value; however, if you want multiple values, there is nothing that prevents you from instantiating the TReduce type parameter with an enumerable type.
When you implement the IDecomposable or IDecomposableRecursive interfaces, you must make sure that your new class has a visibility of public, and that it also supports the default constructor. Your class cannot include data members.
The required signature of the method to which the [Decomposable] attribute is applied is Func<IEnumerable<TSource>, TReduce>. In other words, the types provided as the TSource and TReduce type parameters of the IDecomposable or IDecomposableRecursive interfaces must be the same types as the actual types that instantiate the TSource and TReduce type parameters that are in the signature of the method to which the [Decomposable] attribute is applied.
Note |
|---|
| You must use a method with the signature Func<IEnumerable<TSource>, TReduce> in order for LINQ to HPC to recognize the [Decomposable] attribute. If, for example, you annotate a method with a signature that matches Func<IGrouping<TSource>, TReduce>, the [Decomposable] attribute is silently ignored. This may cause a potentially serious performance bug in your application. |
Here is an example of how to use the [Decomposable] attribute.
string intFileSetName = ...
var query =
context.FromDsc<long>(intFileSetName)
.GroupBy(x => x % 10,
(key, seq) => new KeyValuePair<long, double>(key, MyAverage(seq)));
This query includes a user-provided average operator that is implemented by the MyAverage method. Here is the definition of the MyAverage method.
[Decomposable(typeof(MyAverageDecomposer))]
public static double MyAverage(this IEnumerable<long> recordSequence)
{
long count = 0, sum = 0;
foreach (var r in recordSequence)
{
sum += r;
count++;
}
if (count == 0) throw new Exception("Can't average empty sequence");
return (double)sum / (double)count;
}
The MyAverage method calculates the average. However, it is annotated with the [Decomposable] attribute that tells LINQ to HPC that the method may be decomposed into multiple distributed steps. (Without the [Decomposable] attribute, LINQ to HPC would not be able to execute this operator in a distributed manner.) The [Decomposable] attribute in this example has the MyAverageDecomposer class as its argument. The application’s MyAverageDecomposer class implements the IDecomposableRecursive method.
The MyAverageDecomposer class specifies the details of the decomposition of the MyAverage method. These details include how to perform the partial reductions over the distributed data set, how to combine the intermediate results as they become available, and how to transform the combined intermediate results into a final aggregated value. The stages are combine, then recursive combine, and finally, reduce.
The class’s Combine method performs the initial partial reduction for each DSC file in the input file set. Here is its definition.
[Serializable]
public struct Partial
{
public long PartialSum;
public long PartialCount;
}
public class MyAverageDecomposer : IDecomposableRecursive<long, Partial, double>
{
public Partial Combine(IEnumerable<long> recordSequence)
{
Partial p = new Partial();
foreach (var r in recordSequence)
{
p.PartialSum += r;
p.PartialCount++;
}
return p;
}
// ...
}
In the code, the IDecomposableRecursive interface's type parameters long and double correspond, respectively, to the IEnumerable input parameter’s type parameter and to the return value type of the MyAverage method whose decomposition is being defined by the MyAverageDecomposer class.
The IDecomposableRecursive interface’s TCombine type parameter is instantiated in this example with the user-defined Partial struct type. This struct is the return value type of the Combine method. As the Combine method iterates through its input records of a particular DSC file, it calculates a sum and value count and returns these by using a Partial struct.
As the partial reductions become available, the system collects them and applies the RecursiveCombine method in order to further combine them into intermediate results. Here is the definition of the RecursiveCombine method.
public class MyAverageDecomposer : IDecomposableRecursive<long, Partial, double>
{
// ...
public Partial RecursiveCombine(IEnumerable<Partial> partialSequence)
{
Partial p = new Partial();
foreach (var r in partialSequence)
{
p.PartialSum += r.PartialSum;
p.PartialCount += r.PartialCount;
}
return p;
}
// ...
}
The RecursiveCombine method is invoked as many times as needed to combine the intermediate results of an aggregation tree. The RecursiveCombine operation may be a distributed operation.
When all of the intermediate results have been calculated, the system performs the final reduction stage. To do this it calls the Reduce method, which has the following definition.
public class MyAverageDecomposer : IDecomposableRecursive<long, Partial, double>
{
// ...
public double Reduce(IEnumerable<Partial> combinedItems)
{
Partial final = RecursiveCombine(combinedItems);
if (final.PartialCount == 0)
throw new Exception("Can't average empty sequence");
return (double)final.PartialSum / (double)final.PartialCount;
}
// ...
}
In this example, the Reduce method consolidates the input values, and then divides the sum of the values by the count of the values to return the average of all the values.
Note