Using the Apply Operator for Processing DSC Files
The Apply operator, when combined with the [DistributiveOverConcat] custom attribute, gives you access to the records of individual DSC files. This access is required for many distributed algorithms, particularly those that use partial reduction.
Note |
|---|
| Apply is the lowest-level operator in LINQ to HPC. Of all the operators described in this chapter, it is probably the least frequently used. However, the Apply operator is described first because it is a primitive whose functionality will help you to understand the other, higher-level operations. In practice, use the Apply operator only when the functionality that you need is not provided by higher-level operators. For example, LINQ to HPC can handle many cases of partial reduction for you. See Using the [Associative] Attribute for Distributed Aggregation and Using the [Decomposable] Attribute for Grouped Aggregation for more information about two common cases where LINQ to HPC can automatically apply partial reduction for user-provided aggregation methods. For independent data-parallel operations, use the Select or SelectMany operators. Do not use the Apply operator to independently process individual records. |
Why the Apply operator is needed
The easiest way to understand where the Apply operator is appropriate is to look at an example. Distributed aggregation is an algorithm that accumulates values into local partial results before it combines those results into a final aggregated value. The LINQ to HPC Sum operator implements distributed aggregation. Here is a LINQ to HPC query that uses the Sum operator.
long result = context.FromDsc<long>("MyIntData").Sum();
In this example, if the DSC file set MyIntData stores its records in more than one DSC file, the Sum operator automatically uses the distributed aggregation algorithm to calculate the sum of the sequence of integers. The query first uses the compute nodes of the cluster to calculate subtotals of the integers in each of the DSC files. Then, in a second step, the query collects the distributed subtotals into a single data set and then uses a single compute node to add them together into a final total.
LINQ to HPC uses its knowledge of the underlying division of the data into DSC files to implement the distributed aggregation algorithm. In other words, it would be impossible for the Sum operator to operate in a distributed manner unless there was some way to create subtotals for each DSC file. Similarly, your own distributed algorithms must have the same access to the DSC files. It is provided by the Apply operator when it is used with the [DistributiveOverConcat] custom attribute.
Unary apply
Here is the signature of one of the overloaded versions of the Apply operator. It is referred to as unary Apply because it operates on a single input query.
public static IQueryable<T2> Apply<T1, T2>( this IQueryable<T1> source, Expression<Func<IEnumerable<T1>, IEnumerable<T2>>> applyFunc)
The applyFunc argument is a lambda expression that takes an enumerable as its input and returns an enumerable as its output. There is no requirement that the number of output elements, or the type of the output elements, be the same as the input. In many cases, the output enumeration will contain a single element or even no elements. Distributed aggregation is an example of the first scenario, and distributed search is an example of the second.
Note |
|---|
| If the number of elements that is returned by the applyFunc expression is always the same as the number of input elements, and if each input element is processed independently, you should use the Select operator instead of the Apply operator. |
The behavior of this overloaded version of the Apply operator depends on whether the applyFunc argument is a method that is decorated with the [DistributiveOverConcat] custom attribute, or if it has no attribute. Here are the two cases.
-
[DistributiveOverConcat]. If the applyFunc expression refers to a delegate that is annotated with the [DistributiveOverConcat] custom attribute, then LINQ to HPC invokes applyFunc in a distributed manner one time for each DSC file in the source. This means that there will be a compute vertex for each DSC file. The applyFunc will be passed an enumerable object that contains all of the records of the DSC file being processed. The output enumerations that are returned by calls to the applyFunc method will become the DSC files of the Apply operation's distributed output. Subsequent queries, such as Select, will run in a distributed manner because the output of the Apply operator is partitioned across the compute nodes.
-
No attribute. If the applyFunc expression refers to a delegate that does not have the [DistributiveOverConcat] attribute, then LINQ to HPC invokes applyFunc on the entire sequence of source records without distributed processing. In other words, if the inputs are distributed across the compute nodes of the cluster, LINQ to HPC first collects them into a single enumerable object, and then applies the user-supplied method to the data on a single compute node. It also stores the result on that node. The result of the Apply operator is the enumerable that is returned by the applyFunc method. Note that subsequent queries such as Select also will run on a single node of the cluster because Apply has created an output that resides on a single node.
The enumerable object that is passed as the Apply operator’s applyFunc argument is designed to be streamed. Therefore, it has the restriction that the GetEnumerator method can be invoked only one time, and that the Reset method cannot be called on the resulting enumerator object to change the current iteration position.
An example of the unary apply operator
Here is an example of a query that performs approximately the same steps as the built-in LINQ to HPC Sum operator when the operator is applied to long integers.
string intData = ... long result = context.FromDsc<long>(intData) .Apply(values => CalculateSubtotals(values)) // do for each DSC file .Apply(subtotals => CalculateTotal(subtotals)) // do once (on a single node) .Single();
The example implements the distributed aggregation algorithm for addition. The query uses the Apply operator twice. The first use is a distributed operation that invokes the CalculateSubtotals method once for each DSC file of the input file set. The result is one subtotal for each DSC file. The second use runs on a single compute node and creates a single final total from the subtotals that were calculated in the previous stage. The Single operator extracts the final total from a single-valued query.
You cannot determine from the query itself either that the first Apply operation is a distributed operation, or that the second Apply operation is a global operation on a single compute node. The distinction between the two modes is signaled by the presence or absence of the [DistributiveOverConcat] attribute on the user-provided helper methods CalculateSubtotals and CalculateTotal. Here are their definitions.
[DistributiveOverConcat]
public static IEnumerable<long> CalculateSubtotals(IEnumerable<long> values)
{
long subtotal = 0;
foreach (var value in values)
subtotal += value;
yield return subtotal;
}
public static IEnumerable<long> CalculateTotal(IEnumerable<long> subtotals)
{
long total = 0;
foreach (var subtotal in subtotals)
total += subtotal;
yield return total;
}
The CalculateSubtotals method has the [DistributiveOverConcat] custom attribute. When the method is passed as an argument to the Apply operator, the attribute indicates that the method is intended to be run in a distributed manner on the records of each DSC file in the input file set. The CalculateSubtotals method always returns an enumeration with a single element. Consequently, after the Apply operation has finished, the output will be a file set with as many values as there were DSC files in the MyIntData file set. Each DSC file of the output file set will have one record.
The CalculateTotal method does not have the [DistributiveOverConcat] custom attribute. The absence of the attribute indicates that the method is intended to be run over the entire data set on a single compute node. In this case, however, the input data set is small. It consists of the subtotals that were calculated in the previous stage of the query. Like the CalculateSubtotals method, the CalculateTotal method returns a single-element enumerable object. The output file set will contain a single DSC file that contains a single record.
Note |
|---|
| You can pass a delegate method directly as an argument to the Apply operator, or you can use a lambda expression. For reasons of style, many people prefer to use lambda expressions in every case. A lambda expression in the form x => f(x) or x => x.f () is the only syntax that should be used with the Apply operator. If this syntax is present, LINQ to HPC will look up the attributes of f and see whether the [DistributiveOverConcat] attribute is attached to the method f. If you use a lambda expression in any other form (for example, one that includes a compound expression), LINQ to HPC ignores the [DistributiveOverConcat] attribute of any methods that occur in the subexpressions, which may affect the behavior of the Apply operator in ways that you do not intend. None of LINQ to HPC’s built-in operators are treated as if they have been annotated with the [DistributiveOverConcat] attribute. Therefore, if you use any of LINQ to HPC’s built-in operators in the expression that you pass as the applyFunc argument to the Apply operator, the system will always collapse the input file set into a single partition, and will perform the query on a single compute node. |
The presence or absence of the [DistributiveOverConcat] attribute has a big effect on how your query runs. For example, if you mistakenly add the [DistributiveOverConcat] attribute to the CalculateTotal method that was used in the previous example, the query will fail to compute its total. Instead, the CalculateTotal method would be invoked for each DSC file. The operation would be a no-op—the result would contain only the original subtotals. In addition, the Single operator would find multiple values in its input enumeration and throw an exception.
On the other hand, forgetting to add the [DistributiveOverConcat] attribute to the CalculateSubtotals method would introduce a potentially catastrophic performance problem. Without the attribute, the query would serialize the entire distributed input data set into a single enumeration of values and process the entire data set on a single compute node. No distributed computation would take place. The error would be difficult to debug because the final answer (if you waited long enough to see it) would be correct.
If you use the Apply operator, you must be careful to use the [DistributiveOverConcat] attribute appropriately. The decision to use distributed processing for each DSC file or global processing on a single compute node depends on the algorithm that you are implementing. It is very common to use both forms of the Apply operator in separate stages of a single distributed algorithm.
Binary apply
In addition to the unary overload, the Apply operator has two additional overloaded versions that are, respectively, binary and n-ary versions of the Apply operator. Here is the signature of the binary version.
public static IQueryable<T3> Apply<T1, T2, T3>( this IQueryable<T1> source1, IQueryable<T2> source2, Expression<Func<IEnumerable<T1>, IEnumerable<T2>, IEnumerable<T3>>> applyFunc)
This overloaded version of the Apply operator applies a binary function in a pairwise manner to the records of two input file sets. The applyFunc argument is a lambda expression, or delegate method, that takes two enumerable objects as its input. The first argument contains records from the source1 query, and the second argument contains records from the source2 query.
The enumerable objects that are passed to the Apply operator’s applyFunc argument are designed to be streamed. Therefore, they have the restriction that their GetEnumerator methods can be invoked only one time, and that the Reset method cannot be called on the resulting enumerator object to change the current iteration position.
The binary Apply operation is distributed or global depending on whether the applyFunc has the [DistributiveOverConcat] attribute, the [LeftDistributiveOverConcat] attribute, or no attribute. There are three cases.
-
If the binary version of the Apply operator has the [LeftDistributiveOverConcat] attribute, the function is distributive in its left argument, but not in the right one. In other words, the data for the applyFunc's first argument will be for the current DSC file only, while the data for the applyFunc's second argument will be the concatenation of all DSC files. The concatenated sequences are represented as a single enumerable object. The applyFunc will be invoked in a distributed manner one time for each DSC file that is present in the source1 file set.
-
If the binary version of the Apply operator has the [DistributiveOverConcat] attribute, the function performs pairwise processing of the DSC files. This means that the DSC files of source1 and source2 are both used. The DSC file 1 of source1 is matched with the DSC file 1 of source2, the DSC file 2 of source1 is matched with the DSC file 2 of source2, and so on. The two sources must have the same number of DSC files, or an exception is thrown. It is not required that the corresponding DSC files of source1 and source2 have the same number of records, although this may be a requirement of many applications that use the binary Apply operator.
-
If the binary version of the Apply operator has no custom attribute, then the applyFunc is invoked on a single compute node. The inputs to the applyFunc will be enumerations that contain all of the records of source1 and source2, respectively. The output file set will have a single DSC file.
N-ary apply
The following overloaded version of the Apply operator combines an arbitrary number of input file sets.
public static IQueryable<T2> Apply<T1, T2>( this IQueryable<T1> source, IQueryable<T1>[] otherSources, Expression<Func<IEnumerable<T1>[], IEnumerable<T2>>> applyFunc)
This overloaded version of the Apply operator applies an n-ary function to an arbitrary number of input file sets.
The enumerable objects that are passed to the Apply operator’s applyFunc argument are designed to be streamed. Therefore, they have a restriction that their GetEnumerator methods can be invoked only one time, and that the Reset method cannot be called on the resulting enumerator object to change the current iteration position.
The n-ary Apply operation is distributed or global, depending on whether the applyFunc has the [DistributiveOverConcat] attribute, or no attribute. There are two cases.
-
If the n-ary version of the Apply operator has the [DistributiveOverConcat] attribute, the processing is synchronized among the DSC files of all of the input sources. This means that the DSC files of source and all of the elements of the otherSources array will be used. All of the sources must have the same number of DSC files, or an exception is thrown. It is not required that the corresponding files of the input sources have the same number of records, although this may be a requirement of many applications that use the n-ary version of the Apply operator.
-
If the n-ary version of the Apply operator has no custom attributes, then the applyFunc is invoked on a single compute node. The inputs to the applyFunc are enumerable objects. Each enumerable object produces the records of its associated DSC file.
Note |
|---|
| The [LeftDistributiveOverConcat] operator is ignored when it is used with the n-ary version of the Apply operator. |
Note