The RangePartition Operator

The RangePartition operator uses either user-specified intervals or dynamic sampling to partition records into DSC files. Each of the DSC files in the output file set of a RangePartition operation is associated with a range of key values. Each record is copied into the appropriate DSC file based on its key. Keys are produced by applying a user-provided key selector method to the input records. You can use the RangePartition operator to control how your data is distributed across the cluster. In some cases, you can achieve better performance if you group related data together, and choose an optimal number of DSC files for your file set. For more information see Choosing the Right Number of DSC Files.

The output of a RangePartition query is a file set where the records in each DSC file are selected according to range separators. Range separators are an increasing or decreasing sequence of values. The number of range separators is one less than the number of DSC files that are created by the RangePartition operator. The range separators must be of the same type as the record keys.

If the range-separators are an increasing sequence s1, s2, .., sn-1, then:

  • DSC file 1 has records with key k where k < s1

  • DSC file 2 has records with key k where s1k < s2



  • DSC file n-1 has records with key k where sn-2k < sn-1

  • DSC file n has records with key k where k ≥ sn-1

If the range separators are in decreasing order, then the order of the output DSC files will be reversed. The largest values will be in the first DSC file.

Range partitioning makes no guarantees about the order of records within a DSC file of the output file set.

There are two ways to choose range separators.

  • User-supplied range separators. Some versions of the RangePartition operator take a parameter that gives the range separators as a sorted array. A sequence of n-1 sorted key separator values will produce n DSC files. The sequence can be sorted in increasing or decreasing order. If the sequence of range separators is in decreasing order, then the DSC files in the output file set will put the records with the largest keys in the first DSC file, and will put the records with the smallest keys in the last DSC file.

  • Sampled range separators. The range separators are automatically chosen by the system, based on dynamic random sampling of the data.

Range partitioning can transform a file set with n DSC files into a file set with m DSC files. The number of DSC files in the output file set depends on which overloaded version of the RangePartition operator that you use.

  • Overloads that do not include a rangeSeparators parameter will use m = n.

  • Overloads that include a rangeSeparators parameter will use m = rangeSeparators.Length + 1.

Range partitioning with dynamically sampled data

The simpler form of range partitioning performs dynamic sampling of the input file set to create key separators that create DSC files of approximately equal size. The number of DSC files is equal to the number of DSC files in the input file set.

noteNote
In the current implementation of LINQ to HPC, approximately one sample is taken for every 1,000 records in the input file set. The samples are collated and sorted, and n-1 evenly spaced values are chosen, where n is the number of DSC files in the output file set. The selected values are used as the key separators.

In most cases, the automatic selection of range separators gives good results with little overhead. When this is not the case, for example in cases of extremely large file sets, you can provide range separators to the RangePartition operator. Choose range separator values either by custom sampling or by using domain-specific knowledge about the input distribution. The sampling algorithm used by LINQ to HPC is an implementation choice that may change in future releases. You should not assume any particular sampling density.

The overloaded version of the RangePartition operator for sampled ranges has the following signature.

public static IQueryable<TSource> RangePartition<TSource, TKey>(
    this IQueryable<TSource> source, 
    Expression<Func<TSource, TKey>> keySelector, 
    bool isDescending)

There is also an overloaded version of the RangePartition method that enables you to override default comparison with a custom comparison object. The comparison object will be used to order the record keys with respect to the range selectors, and to sort the sampled range selectors. Here is the signature.

public static IQueryable<TSource> RangePartition<TSource, TKey>(
     this IQueryable<TSource> source, 
     Expression<Func<TSource, TKey>> keySelector, 
     IComparer<TKey> comparer, 
     bool isDescending)

The key values for sampled records are produced by applying the method specified in the keySelector argument. The isDescending argument specifies whether key selectors are sorted in descending order.

Here is an example.

string intData = ...
string rangePartitionedFileSetName = ...

context.FromDsc<long>(intData)
       .RangePartition(x => x, false)
       .ToDsc(rangePartitionedFileSetName)
       .SubmitAndWait();

LINQ to HPC will generate an execution plan for this example that uses distributed computation to sample records from the input file set to determine a set of key selectors. The key selectors cause the data to be approximately equally distributed among the DSC files in the output file set. The number of DSC files will be the same as the number of DSC files in the input file set. The records in each DSC file will be written in no particular order.

You can use range partitioning to implement distributed sorting algorithms. For example, the following code behaves like the LINQ to HPC OrderBy operator when it is applied to a sequence of integer values.

string intData = ...
string orderedFileSetName = ...

context.FromDsc<long>(intData)
       .RangePartition(x => x, false)
       .Apply(localRecords => SortLocalRecords(localRecords))
       .ToDsc(orderedFileSetName)
       .SubmitAndWait(); 

The code implements a distributed sort algorithm. It first repartitions the input file set, but does not change the number of DSC files. The records in the output DSC files are selected so that the lowest-valued records are in the first DSC file, and the highest-valued records are in the last DSC file. The records within each DSC file are unsorted at this point. Next, a distributed Apply operator sorts each batch of records by using a local (that is, LINQ-to-Objects) sort operation. This produces a file set that is fully sorted.

The SortLocalRecords method is a helper method. Here is its definition.

[DistributiveOverConcat]
public static IEnumerable<long> SortLocalRecords(IEnumerable<long> localRecords)
{
   return localRecords.OrderBy(x => x);
}

Range partitioning with array-based range separators

It is possible to partition input records by using range selectors that you provide as an array. There are three overloads that handle this kind of partitioning. The first overloaded version has the following signature.

public static IQueryable<TSource> RangePartition<TSource, TKey>(
    this IQueryable<TSource> source, 
    Expression<Func<TSource, TKey>> keySelector, 
    TKey[] rangeSeparators)

The rangeSeparators parameter is a sorted array of key selectors. The following code is an example of this overload.

string intData = ...
string explicitlyPartitionedFileSetName = ...

long[] ranges = {100000, 200000, 300000, 400000, 500000};

context.FromDsc<long>(intData)
       .RangePartition(x => x, ranges)
       .ToDsc(explicitlyPartitionedFileSetName)
       .SubmitAndWait();

This example reads a file set that contains integers and repartitions the data into a new file set with six files. In the new file set, whose name is given by the variable explicitlyPartitionedFileSetName, the first DSC file contains integers from less than 100,000 to 100,000. The second file set contains integers greater than or equal 100,000 and less than 200,000. The sixth DSC file set contains integers that are greater than or equal to 500,000.

Here is the signature of an overloaded version of the RangePartition operator that accepts a user-provided comparison function. This is needed if the built-in comparison function for the type is not appropriate, or if the type does not already support the Compare method.

public static IQueryable<TSource> RangePartition<TSource, TKey>(
    this IQueryable<TSource> source, 
    Expression<Func<TSource, TKey>> keySelector, 
    TKey[] rangeSeparators, 
    IComparer<TKey> comparer)

Here is the signature of an overloaded version of the RangePartition operator that accepts a user-provided comparison function, and a flag that specifies that the range separators are in descending order.

public static IQueryable<TSource> RangePartition<TSource, TKey>(
    this IQueryable<TSource> source, 
    Expression<Func<TSource, TKey>> keySelector, 
    TKey[] rangeSeparators, 
    IComparer<TKey> comparer, 
    bool isDescending)

The isDescending argument is needed only in the special case where the range separators in the rangeSeparators array consist of a single value (that is, when you are trying to partition the records into only two DSC files). In this case, LINQ to HPC cannot deduce by examining the array of range separators whether ascending or descending order is intended. If the rangeSeparators argument is an array with more than one element and the order of the values in the array does not agree with the isDescending flag, an exception will be thrown.



Show: