Parallel LINQ

Running Queries On Multi-Core Processors

Joe Duffy and Ed Essey

THis article is based on the Parallel FX library, which is currently in development. All information herein is subject to change.

This article discusses:

  • PLINQ basics
  • The PLINQ programming model
  • Concurrent exceptions
  • Output ordering
This article uses the following technologies:
Parallel FX Library

Contents

From LINQ to PLINQ
PLINQ Programming Model
Processing Query Output
Concurrent Exceptions
Ordering in the Output Results
Side Effects
Putting PLINQ to Work

Multi-core processors are here. Once pervasive mainly in servers and desktop PCs, now multi-core processors are being used in mobile phones and PDAs, resulting in great benefits in power consumption. Responding to the increased availability of multi-processor platforms, Parallel Language Integrated Query (PLINQ) offers an easy way to take advantage of parallel hardware, including traditional multi-processor computers and the newer wave of multi-core processors.

PLINQ is a query execution engine that accepts any LINQ-to-Objects or LINQ-to-XML query and automatically utilizes multiple processors or cores for execution when they are available. The change in programming model is tiny, meaning you don't need to be a concurrency guru to use it. In fact, threads and locks won't even come up unless you really want to dive under the hood to understand how it all works. PLINQ is a key component of Parallel FX, the next generation of concurrency support in the Microsoft® .NET Framework.

Using technologies like PLINQ will become increasingly crucial to ensuring the scalability of software on future parallel microprocessor architectures. By utilizing LINQ at choice places throughout your applications today—such as where you have data- or compute-intensive operations that can be expressed as queries—you will ensure that those fragments of your programs continue to perform better when PLINQ becomes available and the machines running your application grow from 2 to 4 to 32 processors and beyond. And even if you only run that code on a single-processor machine, the overhead of PLINQ is typically so small that you won't notice a difference. In addition, the data parallel nature of PLINQ ensures your programs will continue to scale as the size of your data sets increases.

In this article, we review the goals of the PLINQ technology, where it fits into the broader .NET Framework and other concurrency offerings, and what it looks like from the perspective of LINQ developers. We conclude with some example scenarios where PLINQ has already shown tremendous value.

Note that the Parallel FX library, which includes PLINQ, is currently still under development, but the first Community Tech Preview (CTP) will be available from MSDN® in Fall '07. Watch blogs.msdn.com/somasegar for details.

From LINQ to PLINQ

When people first hear about the PLINQ project, they usually ask: why parallelize LINQ? The simple answer is that LINQ's set-at-a-time programming model for expressing computations places an emphasis on specifying what needs to get done instead of how it is to be done. Without LINQ, the how part would typically otherwise be expressed through the use of loops and intermediary data structures, but by encoding so much specific information, the compiler and runtime cannot parallelize as easily. LINQ's declarative nature, on the other hand, leaves the flexibility for a clever implementation like PLINQ to use parallelization to obtain the same results.

The follow-up question is inevitably: if you're going to query enough data for PLINQ to make sense, why not just use a database? And the answer to this question takes the form of a rhetorical question: what code would you have written if you weren't using LINQ? Well, you'd likely be doing the same amount of data- and compute-intensive work, but it would have been captured inside an unstructured series of for loops, function calls, and so forth. So this question implies that all programs should reside in the database, which I'm sure very few people would agree with.

With PLINQ you don't need to move your entire database server processing logic over to in-memory LINQ-to-Objects queries in the client. Rather, PLINQ offers an incremental way of taking advantage of parallelism for existing solutions to existing problems. If your problem is data-intensive enough and would traditionally demand that you pursue a database solution, you should still pursue such a solution. PLINQ changes very little of that. However, if you have several data sources that you'd like to query together—possibly heterogeneous, spanning databases, XML files, and so on—PLINQ can take over and parallelize once data is on the client.

Naturally, people then ask: why doesn't LINQ-to-Objects itself just run queries in parallel? The parallelism would be completely hidden, and developers wouldn't need to change a single line of code to reap the benefits of it. Sadly, there are some subtle gotchas that can arise with the move from LINQ to PLINQ that prohibit us from realizing this vision, at least for PLINQ 1.0. These will be discussed more later in the article.

PLINQ Programming Model

Using PLINQ is almost exactly like using LINQ-to-Objects and LINQ-to-XML. You can use any of the operators available through C# 3.0 and Visual Basic® 9.0 query comprehension syntax or the System.Linq.Enumerable class, including OrderBy, Join, Select, Where, and so on. PLINQ supports all of the LINQ operators, not just those available in language comprehensions. And you can query any in-memory collection such as T[], List<T>, or any other kind of IEnumerable<T> in addition to XML documents loaded using the System.Xml.Linq APIs. If you already have a bunch of LINQ queries, PLINQ is equipped to run them.

LINQ-to-SQL and LINQ-to-Entities queries will still be executed by the respective databases and query providers, so PLINQ does not offer a way to parallelize those queries. If you wish to process the results of those queries in memory, including joining the output of many heterogeneous queries, then PLINQ can be quite useful.

Aside from writing LINQ queries the same way you're used to writing them, there are two extra steps needed to make use of PLINQ:

  1. Reference the System.Concurrency.dll assembly during compilation.
  2. Wrap your data source in an IParallelEnumerable<T> with a call to the System.Linq.ParallelEnumerable.AsParallel extension method.

Calling the AsParallel extension method in Step 2 ensures that the C# or Visual Basic compiler binds to the System.Linq.ParallelEnumerable version of the standard query operators instead of System.Linq.Enumerable. This gives PLINQ a chance to take over control and to execute the query in parallel. AsParallel is defined as taking any IEnumerable<T>:

public static class System.Linq.ParallelEnumerable {
    public static IParallelEnumerable < T > AsParallel < T > (this IEnumerable < T > source);
    ...the other standard query operators...
}

IParallelEnumerable<T> derives from IEnumerable<T> and adds very little to it, being there simply to facilitate binding to PLINQ's ParallelEnumerable query provider, taking advantage of the new extension method support in C# 3.0 and Visual Basic .NET 9.0. The interface derives from IEnumerable<T> so that you can still foreach over instances and pass them to other APIs that expect IEnumerable<T>. The standard query operators defined on ParallelEnumerable mirror those on Enumerable with the only difference being that each takes an IParallelEnumerable<T> as its extension source argument rather than IEnumerable<T>, returns an IParallelEnumerable<T> rather than IEnumerable<T> (except for the aggregations that just return simple types), and of course uses parallelism internally for query evaluation.

For example, take a simple LINQ query defined in C#:

IEnumerable < T > data = ...;
var q = data.Where(x = > p(x)).Orderby(x = > k(x)).Select(x = > f(x));
foreach(var e in q) a(e);

All that is required to use PLINQ in this case is adding a call to AsParallel on data:

IEnumerable < T > data = ...;
var q = data.AsParallel().Where(x = > p(x)).Orderby(x = > k(x)).Select(x = > f(x));
foreach(var e in q) a(e);

This can be written more concisely using the C# query comprehension syntax, in which case the PLINQ version looks like this:

IEnumerable < T > data = ...;
var q = from x in data.AsParallel() where p(x) orderby k(x) select f(x);
foreach(var e in q) a(e);

Once you have made this change, PLINQ will transparently execute the Where, OrderBy, and Select on all of the available processors using classic data parallel evaluation techniques. PLINQ uses deferred execution just like LINQ, meaning the query doesn't begin running until you foreach over it, call GetEnumerator directly, or force the results into a list through some other API such as ToList or ToDictionary. When the query executes, PLINQ will arrange for parts of the query to run on the available processors through the hidden use of multiple threads. You don't even need to understand how all of this is done; you just see better performance and better utilization of the available processors.

Though it's not obvious, the inferred type of q differs between the ordinary LINQ-to-Objects and PLINQ queries shown previously. In the first example, q is typed as IEnumerable<U>, where U is whatever type the f method passed to the Select operator returns. In the second example, however, q is typed as IParallelEnumerable<U>. This usually doesn't matter: if you've declared q to explicitly have type IEnumerable<U>, for instance, the change to AsParallel will still work since IParallelEnumerable<U> derives from IEnumerable<U>. But it does mean that any subsequent use of q will be treated like an IParallelEnumerable<U>. For example, if you subsequently query it, PLINQ will be chosen as the query provider.

Note that some LINQ operators are binary—they take two IEnumerable<T>s as input. Join is a perfect example of such an operator. In these cases, the type of the left-most data source determines whether LINQ or PLINQ is used. Thus you need only call AsParallel on the first data source for your query to run in parallel:

IEnumerable < T > leftData = ..., rightData = ...;
var q = from x in leftData.AsParallel() join y in rightData on x.a == y.b select f(x, y);

All of this discussion has assumed you're using extension methods to write your queries. If you have chosen instead to call methods on the Enumerable class directly, then you will have a little more work to do to move over to PLINQ. In addition to the call to AsParallel, you must also reference the ParallelEnumerable type. For example, imagine the above query was written by calling Enumerable directly:

IEnumerable < T > data = ...;
var q = Enumerable.Select(Enumerable.OrderBy(Enumerable.Where(data, (x) = > p(x)), (x) = > k(x)), (x) = > f(x));
foreach(var e in q) a(e);

To use PLINQ, the query would have to be rewritten like this:

IEnumerable < T > data = ...;
var q = ParallelEnumerable.Select(ParallelEnumerable.OrderBy(ParallelEnumerable.Where(data.AsParallel(), 
  (x) = > p(x)), (x) = > k(x)), (x) = > f(x));
foreach(var e in q) a(e);

For obvious reasons, using comprehensions and extension methods is a more convenient way of writing your queries and carries the added benefit of making it much easier to move to PLINQ.

And that's it! These are the only changes required to use PLINQ in place of LINQ-to-Objects. Because LINQ-to-XML exposes XML documents as IEnumerable<T> data structures, everything said so far applies to querying XML content, too.

Processing Query Output

As already noted, due to deferred evaluation, parallelism doesn't get introduced until you start processing the output of the query. If you're familiar with IEnumerable<T>, this equates to calling the GetEnumerator method. There are three basic ways that PLINQ queries can be processed, each leading to a slightly different model for parallelism.

The first is pipelined processing, in which case the thread doing the enumeration is separate from the threads devoted to running the query. PLINQ will use many threads for query execution, but will reduce the degree of parallelism by one so that the enumerating thread is not interfered with. For instance, if you have eight processors available, seven of them will run the PLINQ query while the remaining processor runs the foreach loop on the output of the PLINQ query as elements become available. This carries the benefit of allowing more incremental processing of output, thus reducing the memory requirements necessary to hold the results; however, having many producer threads and only a single consumer thread can often lead to uneven work distribution, resulting in processor inefficiency.

The second model is stop-and-go processing. In this model, the thread that starts the enumeration joins all of the other threads to execute the query. Once all threads have finished producing the complete set of output, the thread then proceeds to enumerating the output. This has the benefit that all processing power is devoted to creating the output as quickly as possible. It is also slightly more efficient than pipelined processing because there is less incremental synchronization overhead in the implementation: PLINQ can be more intelligent about these things because it knows exactly where the output data is going and how it will be accessed. If the work distribution is uneven between the consumers and producer, this is usually a more appropriate method of consumption.

Finally, there is inverted enumeration. A lambda function is provided to PLINQ that is run in parallel, once for each element in the output. This is the most efficient mechanism because there is much more parallelism exposed to PLINQ, and the implementation avoids costly operations such as merging output from multiple threads. But it has the drawbacks that you cannot simply use a foreach loop, you must use a special ForAll API, and you must be careful that the lambdas do not rely on shared state. Otherwise, introducing parallelism will render your queries incorrect and can cause unpredictable crashes or data corruption. If you can express your problem in terms of this API, however, it is always the preferred method.

Which of these three models you use depends on what you do with the query results. The default is the first—pipelined processing. As soon as MoveNext is called on the resulting query enumerator, a set of additional worker threads will execute the query, and results are returned from this and all subsequent MoveNext calls as they become available. If a call to MoveNext is made and no output is ready from the query producer threads, the calling thread will block until an element is available. If you just use foreach to process the output of a PLINQ query, this is what you will get:

var q = ...some query...;
foreach(var e in q) {
    a(e);
    // this runs in parallel with the execution of 'q' }

The IParallelEnumerable<T> interface actually offers an overload of GetEnumerator that takes a bool argument called pipelined, allowing you to choose stop-and-go processing instead (true means pipelined and false means stop-and-go). Upon the first subsequent call to MoveNext, the entire query will be executed and the call will only return when all output is available. Subsequent calls to MoveNext just enumerate a buffer containing the output:

var q = ...some query...;
using(var e = q.GetEnumerator(false)) {
    while (e.MoveNext()) {
        // after the 1st call, the query is finished executing 
        // we then just enumerate the results from an in-memory list 
        a(e.Current);
    }
}

There are a few special cases where stop-and-go processing is used as the default: if you use the ToArray or ToList methods, these operators will internally force a stop-and-go operation. If you have a sort in your query, stop-and-go will be used instead because pipelining the output of a sort is wasteful. A sort exhibits extremely high latency (because it generally needs to sort the whole input before producing a single output element), and so PLINQ prefers to devote all processing power to completing the sort as quickly as possible.

In order to use inverted enumeration, you must use a different PLINQ-specific API:

public static class System.Linq.ParallelEnumerable {
    public static void ForAll < T > (this IParallelEnumerable < T > source, Action < T > action);
    ...the other standard query operators...
}

Using the ForAll API looks quite a bit like using a foreach loop, as you just saw:

var q = ... some query ...; 
q.ForAll(e => a(e));

Concurrent Exceptions

Compelling Scenarios

While reading this article, you have probably already begun to imagine some ways to use PLINQ in your own applications. Perhaps you are already using LINQ today and want to improve your application's scalability on machines with multiple processors or multiple cores. Of course, PLINQ can make your current programs run faster, but it also allows you to do more compute work and operate on larger data sizes in the same amount of time, while processing streams of data at a faster rate. In doing all of this, the new PLINQ technology might open up new application possibilities that you could not previously attempt.

Let's look at a few illustrations of scenarios in which multi-core and PLINQ open up new doors. Consider a music producer working in a sound studio who wants to apply a series of effects on raw instrument sounds to produce a more aesthetically pleasing, production-quality master track. The company that provides his mixing software could apply these effects using PLINQ. These effects are usually composed of filters and projections over large streams of data (the raw music). PLINQ could greatly speed up the production time and utilize more powerful hardware as it becomes available. This approach might even allow music transformations in near real time, instead of doing complete post-production processing.

Likewise, consider a foreign exchange currency trader who looks for arbitrage conditions (inefficiencies in the market) in order to make a profit. Such shifts are minute and disappear as the market is constantly reaching equilibrium, requiring very fast trades. Querying stock trade information using parallelism via PLINQ could enable close to real time decision making, informed by large amounts of data and complicated analysis and computations.

These are just a few samples of the ways in which the speedup provided by PLINQ on multi-core hardware can provide a business advantage. Other domains offer similar opportunities, such as healthcare, economics, geological modeling, scientific computing, traffic control and simulations, gaming, artificial intelligence, machine learning, linguistic analysis, and the list goes on.

Although the statements earlier about PLINQ's parallelization process being entirely transparent were mostly true, there are a small number of places where the use of parallelism can leak through the simple abstractions presented above. These are the gotchas that were alluded to earlier. Happily, most of them are minor, but you should be aware of them anyway.

Any lambdas or query operators that throw an exception stop sequential LINQ queries from executing right away. That's because when only one processor is used to run the query, elements are processed one after the other, sequentially: if an operator fails on one of them, the exception is thrown immediately and the subsequent elements will not even be considered. The same is not true of PLINQ.

As an illustration, take a look at this (contrived) query:

object[] data = new object[] {
    "foo", null, null, null
};
var q = data.Select(x = > x.ToString());
foreach(var e in q) Console.WriteLine(e);

Every time you run it with LINQ, it will succeed in running ToString on the first array element, and then it will fail with a NullReferenceException trying to call ToString on the second. You never get to the third or fourth element. When multiple processors are involved, however, as is the case with PLINQ, you have the possibility that multiple exceptions can happen in parallel. Depending on how PLINQ decides to subdivide the problem, you may see failures for 1, 2, and 3, all simultaneously, or any combination of those, including possibly 3, but not 1 or 2.

To deal with this, PLINQ uses a slightly different exception model than LINQ to communicate failures. When an exception occurs on one of the PLINQ threads, the system first tries to stop all other threads from running as quickly as possible. This process happens completely transparently. But this may or may not be accomplished in time to prevent other exceptions from happening concurrently and, indeed, they may have already occurred by the time PLINQ gets involved. Once all threads have shut down, the full set of exceptions that have occurred will be aggregated into a new System.Concurrency.MultipleFailuresException object and that new aggregate exception object will be rethrown. Each exception that occurred is subsequently accessible through the InnerExceptions property, of type Exception[], including unperturbed stack traces.

PLINQ actually always throws a single MultipleFailuresException when an unhandled exception terminates a query's execution, even if only one exception is actually thrown. In the previous example, that means PLINQ always wraps the NullReferenceExceptions in a MultipleFailuresException. If it didn't and you wanted to catch an exception of a particular type, you'd have to write multiple catch clauses. Clearly you usually don't catch certain kinds of exceptions, but if you had wanted to, you would have had to write the following and duplicate a bunch of logic:

try {
    // query... 
} catch (NullReferenceException) {...
} catch (MultipleFailuresException) {...
}

Not only is this clumsy, but developers would be apt to forget about one or the other, leading to bugs that happen only under some circumstances and configurations.

This can unfortunately make debugging more difficult. If an exception goes unhandled and you attach a debugger, you will break into the call to GetEnumerator (if you're calling foreach over the query results) rather than where your exception originated from to begin with. This is similar to what occurs with the asynchronous programming model (BeginXX/EndXX) in the .NET Framework today. Thankfully, PLINQ preserves the original stack traces, so if you expand the MultipleFailuresException object and look at its InnerExceptions property, you will find the full set of exceptions with the complete original stack traces available.

Ordering in the Output Results

Say you've written the following code in LINQ:

int[] data = new int[] {
    0, 1, 2, 3
};
int[] data2 = (from x in data select x * 2).ToArray();

Can you predict the contents of data2? The question seems so simple that it's silly to even consider. Everybody would say: { 0, 2, 4, 6 }. But if you just change the code as shown here, the possible contents of data2 actually differs:

int[] data = new int[] {
    0, 1, 2, 3
};
int[] data2 = (from x in data.AsParallel() select x * 2).ToArray();

In this case, { 0, 2, 4, 6 } is surely possible, but so is { 6, 0, 2, 4 }, or any other permutation of these four numbers.

This is so because PLINQ runs the query in parallel and the results are made available as soon as they become available, no matter whether you are iterating over the query with foreach or marshaling the results into an array with ToArray. The LINQ ordering is simply a byproduct of the fact that its implementation processes input sequentially. Conversely, the PLINQ ordering is determined by the nondeterministic scheduling of parallel units of work, which is apt to change wildly from one execution to the next.

This was an explicit design decision made by the PLINQ team. Historically, queries have not guaranteed anything about ordering. If you take a look at SQL Server™, for example, unless you have specified an order by clause in the query text, the ordering will be dependent on many things: whether an index is used in the query, the layout of records on disk, and so on. In fact, it can also be nondeterministic, because SQL Server can use parallelism in the evaluation of queries too!

Because users frequently need to preserve ordering, and to alleviate some minor challenges for some people trying to migrate from LINQ, PLINQ offers a way of opting in to order preservation. Order preservation simply ensures that, provided there are no intervening sort operations, the relative ordering among output elements is strongly tied to the relative ordering among input elements. If you wanted to ensure that the output of the above query was always {0, 2, 4, 6 }, then you can use the following query instead:

int[] data = new int[] {
    0, 1, 2, 3
};
int[] data2 = (from x in data.AsParallel(QueryOptions.PreserveOrdering) 
  select x * 2).ToArray();

Order preservation is not free. In fact, it can substantially impact the performance and scaling of your queries. This is because PLINQ will logically insert a sort operation at the end, and the sort is an operator that does not scale perfectly with an increase in the number of processors. To get an idea of what this means, the previous query is logically equivalent to the following query:

int[] data = new int[] {
    0, 1, 2, 3
};
int[] data2 = data.AsParallel().Select((x, i) = > new {
    x, i
}).
// remember indices 
Select((x) = > new {
    x * 2, i
}).
// (in original) 
OrderBy((x) = > x.i).
// ensure order is preserved 
Select((x) = > x.x).
// get rid of indices from output 
ToArray();
// (in original)

The steps introduced by PLINQ to support order preservation are in red. As you can see, it's quite a bit of work! Obviously PLINQ implements this functionality in a much more efficient manner than if you had just written this version of the query, but the steps are logically equivalent.

Side Effects

PLINQ Resources

PLINQ relies on something called statistical purity: most LINQ queries do not mutate data structures or perform impure operations most of the time. Another way of saying this is that most LINQ queries are strictly functional; they take some data as input, perform some calculations, and create an entirely separate copy of the data, with some changes, as output. But this best practice usage isn't enforced in any way by the compilers or runtime. If people have written queries that break this common usage pattern, then the shift to PLINQ can become quite a bit more complicated.

As an example, consider this LINQ query:

var q = from x in data where (x.f-- > 0) select x;

Notice that the predicate in the where clause actually modifies a field of an object. This means that the query may not be safe to run in parallel without exposing race conditions and concurrency bugs. By default, you must assume that queries like this are not safe and must not be run with PLINQ. But when would it actually be safe? The races occur only if the predicate is run on multiple threads against the same object, which can only happen if data contains duplicate objects. So if data is a set, then it's perfectly OK.

There are other cases, such as where static variables are used, that are always unsafe:

class C {
    internal static int s_cCounter;
}
// elsewhere... 
var q = from x in data where(x.f == C.s_cCounter++) select x;

If you run this query in parallel, you're apt to be quite disappointed. You could end up with many objects with duplicate f field values, which only occurs due to the race condition. Be careful: your initial reaction and approach to solving this might be to replace C.s_cCounter++ with Interlocked.Increment(ref C.s_cCounter). While this approach leads to a correct solution, any kind of synchronization will vastly reduce the scalability of your queries. You should strive to eliminate all reliance on mutability and shared state from your queries as a first principle.

The Windows® platform is very thread-centric, and a lot of system state can end up attached to the thread running some piece of code itself. This can take the form of thread-local storage, security information such as impersonation, COM apartments (particularly Single Threaded Apartments), and GUI frameworks such as Windows Forms and Windows Presentation Foundation. All of these can cause trouble when moving from LINQ to PLINQ.

Whereas in the LINQ model all code in the query runs on the same thread that initiated the query, with PLINQ the code in the query is distributed over many threads. Unfortunately, it's not always obvious when you've taken a dependency on thread affinity, since many services in the .NET Framework intrinsically and transparently rely on them internally. The best advice I can give is to be on the lookout for the abovementioned sources of concurrency hazards and avoid them as much as possible from within your queries.

Putting PLINQ to Work

PLINQ allows LINQ developers to take advantage of parallel hardware—including achieving better performance and building more intrinsically scalable software—without needing to understand concurrency or data parallelism. The programming model is simple, enabling a broader number of developers to take advantage of more parallel hardware. The "PLINQ Resources" sidebar lists some references for more information.

For those already using LINQ-to-Objects, LINQ-to-XML, or LINQ to tie together heterogeneous data sources, it is a straightforward process to get your existing queries to use PLINQ. Take a look at the sidebar "Compelling Scenarios" for further possible uses of this technology. For those who have not yet begun adopting LINQ, the parallel speedup gains are another reason for you to consider using it. Even though PLINQ is not available today, using LINQ in your programs is an investment in the future.

Joe Duffy is Development Lead for the Parallel FX team at Microsoft. He is writing a book, Concurrent Programming on Windows, to be published by Addison-Wesley.

Ed Essey a five-year veteran Program Manager at Microsoft, works on parallel computing models and infrastructure.