Using the [Associative] Attribute for Distributed Aggregation
LINQ to HPC implements distributed aggregation for the following LINQ operators.
-
Count, LongCount
-
First, Max, Min, Sum, Average
-
Any, All, Contains
-
Distinct
-
Aggregate
The LINQ Aggregate operator accepts a user-provided aggregation method. However, LINQ to HPC can use distributed aggregation only when the underlying reduction is mathematically associative. A binary operator "+" is considered to be associative if (a+b) +c = a+ (b+c) for all instances of a, b, and c.
You can use the [Associative] custom attribute to tell LINQ to HPC that a user-provided aggregation method is associative. Adding the attribute results in a query plan that uses distributed computation to perform partial aggregation, followed by a stage that merges the partial results into a final value. There are two variations of this pattern. One variation involves type transformation on the intermediate values, and one does not.
Distributed aggregation without type transformation
Here is an example of a typical aggregation query.
string intData = ...
long result = context.FromDsc<long>(intData)
.Aggregate((x, y) => Add(x, y));
The query calls the user-provided Add method as the aggregation operator. Here is the Add method's definition.
[Associative]
public static long Add(long x, long y) { return x + y; }
The [Associative] attribute indicates that it is safe for LINQ to HPC to distribute the aggregation operation over the compute nodes of the cluster, and then combine the local partial results into a final result as a second step.
Note |
|---|
| If you forget to apply the [Associative] attribute to your user-defined aggregation function, you may introduce a catastrophic performance bug into your application. Without the [Associative] attribute, the query concatenates all of the records of the input file set into a single enumerable object and then uses a single compute node to perform the aggregation. |
Note |
|---|
| LINQ to HPC's Aggregate operator does not assume that the aggregation function you supply is commutative. Recall from mathematics that the "+" operator is considered to be commutative if a + b = b + a, for all instances of a and b. The aggregation of partial results occurs in left-to-right order. LINQ to HPC uses a multistage merge stage. The aggregation operator can be applied to reduce a tree of partial results rather than a sequence of partial results. |
Distributed aggregation with type transformation
The LINQ Aggregate operator permits the type returned by the aggregation function to differ from the type of the objects being aggregated. Here is an example.
string intData = ...
string result = context.FromDsc<long>(intData)
.Aggregate("", (str, x) => IntToStringCsvAggregator(str, x));
This query converts a sequence of integers into a single formatted string. For example, 1, 2, 3 becomes the string "1, 2, 3". The query uses a distributed algorithm to create partial results for each DSC file set. Here is the definition of the aggregation function.
[Associative(typeof(RecursiveAccumulator))]
public static string IntToStringCsvAggregator(string current, long next)
{
return CsvCombiner(current, next.ToString());
}
public class RecursiveAccumulator : IAssociativeRecursive<string>
{
public string RecursiveAccumulate(string current, string next)
{
return CsvCombiner(current, next);
}
}
public static string CsvCombiner(string a, string b)
{
if (a == "")
return b;
else if (b == "")
return a;
else
return a + ", " + b;
}
The [Associative] attribute optionally includes a parameter whose value is a user-defined type that implements the IAssociativeRecursive<T> interface.
Here is the definition of the IAssociativeRecursive<T> interface.
public interface IAssociativeRecursive<TAggregate>
{
TAggregate RecursiveAccumulate(TAggregate first, TAggregate second);
}
The IAssociativeRecursive<T> interface includes a RecursiveAccumulate method that merges the intermediate partial results of the local aggregation. The type parameter TAggregate is the intermediate type that is used by the aggregation. The type that you provide as the TAggregate type parameter must be the same type as the return value of the method to which the [Associative] attribute is applied. In this example, it is the string class.
When you implement the IAssociativeRecursive<T> interface, you must make sure that your new class has a visibility of public, and that it also supports the default constructor. Your class cannot include data members.
In the preceding example, LINQ to HPC executes the aggregation by first producing a comma-separated value (CSV) string for each DSC file in the input file set. Then, as a second step, the partial results are combined pairwise by using the RecursiveAccumulator method of the class that was provided as an argument to the [Associative] attribute. To do this correctly (that is, to conform to the LINQ specification and match the output of LINQ-to-Objects), the CsvCombiner method must insert an additional comma if both CSV strings are nonempty, and the method must be invoked in a way that respects the ordering of records in the input file set.
You can understand the effect of the [Associative] attribute and the IAssociativeRecursive<T> interface in terms of the Apply operator. The following code produces the same result as the previous example. It uses a two-stage, distributed algorithm.
string intData = ...
var result2 = context.FromDsc<long>(intData)
.Apply(localRecords => LocalAggregate(localRecords)) // do once per DSC file
.Apply(csvStrings => CombinePartialResults(csvStrings)) // do one time only
.Single();
The execution plan for this query uses distributed processing to perform a local aggregation operation for each DSC file in the input file set. It then combines the local partial results by using a single compute node of the cluster. Here are the definitions of the LocalAggregate and CombinePartialResults methods.
[DistributiveOverConcat]
public static IEnumerable<string> LocalAggregate(IEnumerable<long> localRecords)
{
yield return
localRecords.Aggregate("", (str, x) => CsvCombiner(str, x.ToString()));
}
public static IEnumerable<string> CombinePartialResults(IEnumerable<string> csvStrings)
{
yield return
csvStrings.Aggregate((s1, s2) => CsvCombiner(s1, s2));
}
The seed value that is passed to the Aggregate method should be an identity with respect to the aggregate operation because it may be added to the computation multiple times. For example, this occurs in the case of a grouped aggregation query.
Note that the LocalAggregate method uses the [DistributiveOverConcat] attribute as a way to force the Apply operator to operate on individual DSC files. The Aggregate operators in the LocalAggregate and CombinePartialResults methods use LINQ-to-Objects to evaluate their LINQ queries.
Note |
|---|
| LINQ to HPC does not assume that the RecursiveAccumulate method is commutative. |
Note |
|---|
| The code in this section uses the creation of a single CSV string as an example of aggregation, where the result type does not match the input type. This example illustrates the concepts of distributed aggregation, but the amount of memory needed to represent the aggregated output values means that it will not scale to large data sets. |
Note