Export (0) Print
Expand All


In Chapter 3, "Parallel Tasks," you saw how the Parallel Task pattern allows you to fork the flow of control in a program. In this chapter, you'll see how control flow and data flow can be integrated with the Futures pattern.

A future is a stand-in for a computational result that is initially unknown but becomes available at a later time. The process of calculating the result can occur in parallel with other computations. The Futures pattern integrates task parallelism with the familiar world of arguments and return values. If the parallel tasks described in Chapter 3 are asynchronous actions, futures are asynchronous functions. (Recall that actions don't return values, but functions do.)

Futures express the concept of potential parallelism that was introduced in Chapter 1, "Introduction." Decomposing a sequential operation with futures can result in faster execution if hardware resources are available for parallel execution. However, if all cores are otherwise occupied, futures will be evaluated without parallelism.

In the Microsoft® .NET Framework, futures are implemented with the Task<TResult> class, where the type parameter TResult gives the type of the result. In other words, a future in .NET is a task that returns a value. Instead of explicitly waiting for the task to complete using a method such as Wait, you simply ask the task for its result when you are ready to use it. If the task has already finished, its result is waiting for you and is immediately returned. If the task is running but has not yet finished, the thread that needs the result blocks until the result value becomes available. (While the thread is blocked, the core is available for other work.) If the task hasn't started yet, the task will be executed inline in the current thread context, if possible.

The .NET Framework also implements a variation of the Futures pattern known as continuation tasks. A .NET continuation task is a task that automatically starts when other tasks, known as its antecedents, complete. In many cases, the antecedents consist of futures whose result values are used as input by the continuation task. An antecedent may have more than one continuation task.

Continuation tasks represent the nested application of asynchronous functions. In some ways, continuation tasks act like callback methods—in both cases, you register an operation that will be automatically invoked at a specified point in the future.

The Futures pattern discussed in this chapter is closely related to what is sometimes known as a task graph. When futures provide results that are the inputs to other futures, this can be seen as a directed graph. The nodes are tasks, and the arcs are values that act as inputs and outputs of the tasks.

Be careful not to confuse futures with pipelines. As you will see in Chapter 7, "Pipelines," pipeline tasks are also nodes of a directed graph, but the arcs that connect stages of the pipeline are concurrent queues that convey a series of values, like an assembly line or data stream. In contrast, with futures, nodes of the task graph are connected by singleton values, similar to arguments and return values.

The Basics

When you think about the Parallel Task pattern described in Chapter 3, you see that, in many cases, the purpose of a task is to calculate a result. In other words, asynchronous operations often act like functions. Of course, tasks can also do other things, such as reordering values in an array, but calculating new values is common enough to warrant a pattern tailored to it. It's also much easier to reason about pure functions, which don't have side effects and therefore exist purely for their results. This simplicity becomes very useful as the number of cores becomes large.


The following example is from the body of a sequential method.

  var b = F1(a); 
  var c = F2(a); 
  var d = F3(c); 
  var f = F4(b, d);
  return f;

Suppose that F1, F2, F3, and F4 are processor-intensive functions that communicate with one another using arguments and return values instead of reading and updating shared state variables.

Suppose, also, that you want to distribute the work of these functions across available cores, and you want your code to run correctly no matter how many cores are available. When you look at the inputs and outputs, you can see that F1 can run in parallel with F2 and F3 but that F3 can't start until after F2 finishes. How do you know this? The possible orderings become apparent when you visualize the function calls as a graph. Figure 1 illustrates this.


Figure 1

A task graph for calculating f

The nodes of the graph are the functions F1, F2, F3, and F4. The incoming arrows for each node are the inputs required by the function, and the outgoing arrows are values calculated by each function. It's easy to see that F1 and F2 can run at the same time but that F3 must follow F2.

Here's an example that shows how to create futures for this example. For simplicity, the code assumes that the values being calculated are integers and that the value of variable a has already been supplied, perhaps as an argument to the current method.

  Task<int> futureB = Task.Factory.StartNew<int>(() => F1(a));
  int c = F2(a); 
  int d = F3(c); 
  int f = F4(futureB.Result, d);
  return f;

This code creates a future that begins to asynchronously calculate the value of F1(a). On a multicore system, F1 will be able to run in parallel with the current thread. This means that F2 can begin executing without waiting for F1. The function F4 will execute as soon as the data it needs becomes available. It doesn't matter whether F1 or F3 finishes first, because the results of both functions are required before F4 can be invoked. (Recall that the Result property does not return until the future's value is available.) Note that the calls to F2, F3, and F4 do not need to be wrapped inside of a future because a single additional asynchronous operation is all that is needed to take advantage of the parallelism of this example.

Of course, you could equivalently have put F2 and F3 inside of a future, as shown here.

  Task<int> futureD = Task.Factory.StartNew<int>(
                                            () => F3(F2(a)));
  int b = F1(a);
  int f = F4(b, futureD.Result);
  return f;

It doesn't matter which branch of the task graph shown in the figure runs asynchronously.

An important point of this example is that exceptions that occur during the execution of a future are thrown by the Result property. This makes exception handling easy, even in cases with many futures and complex chains of continuation tasks. You can think of futures as either returning a result or throwing an exception. Conceptually, this is very similar to the way any .NET function works. Here is an example.

  Task<int> futureD = Task.Factory.StartNew<int>(
                                            () => F3(F2(a)));
    int b = F1(a);
    int f = F4(b, futureD.Result);
    return f;
  catch (MyException)
     Console.WriteLine("Saw MyException exception");
     return -1;

If an exception of type MyException were thrown in F2 or F3, it would be deferred and rethrown when the Result property of futureD is read. Getting the value of the Result property occurs within a try block, which means that the exception can be handled in the corresponding catch block.

Continuation Tasks

It's very common for one asynchronous operation to invoke a second asynchronous operation and pass data to it. Continuation tasks make the dependencies among futures apparent to the run-time environment that is responsible for scheduling them. This helps to allocate work efficiently among cores.

For example, if you want to update the user interface (UI) with the result produced by the function F4 from the previous section, you can use the following code.

TextBox myTextBox = ...;

var futureB = Task.Factory.StartNew<int>(() => F1(a));
var futureD = Task.Factory.StartNew<int>(() => F3(F2(a)));

var futureF = Task.Factory.ContinueWhenAll<int, int>(
                 new[] { futureB, futureD },
                 (tasks) => F4(futureB.Result, futureD.Result));
futureF.ContinueWith((t) =>
       (Action)(() => { myTextBox.Text = t.Result.ToString(); }))

This code structures the computation into four tasks. The system understands the ordering dependencies between continuation tasks and their antecedents. It makes sure that the continuation tasks will start only after their antecedent tasks complete.

The first task, futureB, calculates the value of b. The second, futureD, task calculates the value of d. These two tasks can run in parallel. The third task, futureF, calculates the value of f. It can run only after the first two tasks are complete. Finally, the fourth task takes the value calculated by F4 and updates a text box on the user interface.

The ContinueWith method creates a continuation task with a single antecedent. The ContinueWhenAll<TAntecedentResult, TResult> method of the Task.Factory object allows you to create a continuation task that depends on more than one antecedent task.

Example: The Adatum Financial Dashboard

Here's an example of how the Futures pattern can be used in an application. The example shows how you can run computationally intensive operations in parallel in an application that uses a graphical user interface (GUI).

Adatum is a financial services firm that provides a financial dashboard application to its employees. The application, known as the Adatum Dashboard, allows employees to perform analyses of financial markets. The dashboard application runs on an employee's desktop workstation. The Adatum Dashboard analyzes historical data instead of a stream of real-time price data. The analysis it performs is computationally intensive, but there is also some I/O latency because the Adatum Dashboard application collects input data from several sources over the network.

After the application has the market data, it merges the datasets together. The application normalizes the merged market data and then performs an analysis step. After the analysis, it creates a market model. It also performs these same steps for historical market data from the Federal Reserve System. After the current and historical models are ready, the application compares the two models and makes a market recommendation of "buy," "sell," or "hold." You can visualize these steps as a graph. Figure 2 illustrates this.


Figure 2

Adatum Dashboard tasks

The tasks in this diagram communicate by specific types of business objects. These are implemented as .NET classes in the Adatum Dashboard application.

You can download the source code for the Adatum Dashboard application from the CodePlex site at http://parallelpatterns.codeplex.com in the Chapter5\A-Dash project. The application consists of four parts: the business object definitions, an analysis engine, a view model, and the user interface, or the view. Figure 3 illustrates this.


Figure 3

Adatum Dashboard application

The Business Objects

The Adatum Dashboard uses immutable data types. Objects of these types cannot be modified after they are created, which makes them well suited to parallel applications.

The StockDataCollection class represents a time series of closing prices for a group of securities. You can think of this as a dictionary indexed by a stock symbol. Conceptually, the values are arrays of prices for each security. You can merge StockDataCollection values as long as the stock symbols don't overlap. The result of the merge operation is a new StockDataCollection value that contains the time series of the inputs.

The StockAnalysisCollection class is the result of the analysis step. Similarly, the MarketModel and MarketRecommendation classes are the outputs of the modeling and the comparison phases of the application. The MarketRecommendation class has a property that contains a "buy, hold, or sell" decision.

For more information about how to implement your own immutable types, see the section, "Immutable Types," in Appendix A, "Adapting Object-Oriented Patterns."

The Analysis Engine

The Adatum Dashboard's AnalysisEngine class produces a market recommendation from the market data it receives.

The sequential process is shown in the following code.

public MarketRecommendation DoAnalysisSequential()
  StockDataCollection nyseData = 
  StockDataCollection nasdaqData = 
  StockDataCollection mergedMarketData = 
    MergeMarketData(new[]{nyseData, nasdaqData});
  StockDataCollection normalizedMarketData = 
  StockDataCollection fedHistoricalData = 
  StockDataCollection normalizedHistoricalData = 
  StockAnalysisCollection analyzedStockData = 
  MarketModel modeledMarketData = 
  StockAnalysisCollection analyzedHistoricalData = 
  MarketModel modeledHistoricalData =
  MarketRecommendation recommendation = 
    CompareModels(new[] { modeledMarketData,
                          modeledHistoricalData });
  return recommendation;

The final result of the computation is a MarketRecommendation object. Each of the method calls returns data that becomes the input to the operation that invokes it. When you use method invocations in this way, you are limited to sequential execution. The DoAnalysisSequential method returns only after all the dependent operations complete.

The parallel version uses futures and continuation tasks for each of the operational steps. Here's the code.

public AnalysisTasks DoAnalysisParallel()
  TaskFactory factory = Task.Factory;
  // ...

  Task<StockDataCollection> loadNyseData =
      () => LoadNyseData(),

  Task<StockDataCollection> loadNasdaqData =
      () => LoadNasdaqData(),

  Task<StockDataCollection> mergeMarketData =
      new[] { loadNyseData, loadNasdaqData },
      (tasks) => MergeMarketData(
                        from t in tasks select t.Result));

  Task<StockDataCollection> normalizeMarketData =
      (t) => NormalizeData(t.Result));

  Task<StockDataCollection> loadFedHistoricalData =
      () => LoadFedHistoricalData(),

  Task<StockDataCollection> normalizeHistoricalData =
      (t) => NormalizeData(t.Result));

  Task<StockAnalysisCollection> analyzeMarketData =
      (t) => AnalyzeData(t.Result));

  Task<MarketModel> modelMarketData =
      (t) => RunModel(t.Result));

  Task<StockAnalysisCollection> analyzeHistoricalData =
      (t) => AnalyzeData(t.Result));

  Task<MarketModel> modelHistoricalData =
      (t) => RunModel(t.Result));

  Task<MarketRecommendation> compareModels =
    factory.ContinueWhenAll<MarketModel, MarketRecommendation>(
      new[] { modelMarketData, modelHistoricalData },
      (tasks) => CompareModels(from t in tasks select t.Result));

  Task errorHandler = CreateErrorHandler(loadNyseData, 
    loadNasdaqData, loadFedHistoricalData, 
    mergeMarketData, normalizeHistoricalData, 
    normalizeMarketData, analyzeHistoricalData, 
    analyzeMarketData, modelHistoricalData, 
    modelMarketData, compareModels);

  return new AnalysisTasks()
      LoadNyseData = loadNyseData,
      LoadNasdaqData = loadNasdaqData,
      MergeMarketData = mergeMarketData,
      NormalizeMarketData = normalizeMarketData,
      LoadFedHistoricalData = loadFedHistoricalData,
      NormalizeHistoricalData = normalizeHistoricalData,
      AnalyzeMarketData = analyzeMarketData,
      AnalyzeHistoricalData = analyzeHistoricalData,
      ModelMarketData = modelMarketData,
      ModelHistoricalData = modelHistoricalData,
      CompareModels = compareModels,
      ErrorHandler = errorHandler

The parallel version, provided by the DoAnalysisParallel method, is similar to the sequential version, except that the synchronous method calls have been replaced with futures and continuation tasks. The method returns an AnalysisTasks object that contains the tasks associated with each step of the calculation. The DoAnalysisParallel method returns immediately, leaving the tasks running. The next sections describe how each of the tasks is created.

Loading External Data

The methods that gather the external data from the network are long-running, I/O intensive operations. Unlike the other steps, they are not particularly processor intensive, but they may take a relatively long time to complete. Most of their time is spent waiting for I/O operations to finish. You create these tasks with a factory object, and you use an argument to specify that the tasks are of long duration. This temporarily increases the degree of concurrency that is allowed by the system. The following code shows how to load the external data.

  Task<StockDataCollection> loadNyseData =
      () => LoadNyseData(),

  Task<StockDataCollection> loadNasdaqData =
      () => LoadNasdaqData(),

Note that the factory creates futures that return values of type StockDataCollection. The TaskCreationOptions.LongRunning enumerated value tells the task library that you want more concurrency. To prevent underutilization of processor resources, the task library may choose to run tasks like these in additional threads.


The merge operation takes inputs from both the loadNyseData and the loadNasdaqData tasks. It's a continuation task that depends on two antecedent tasks, as shown in the following code.

  Task<StockDataCollection> mergeMarketData =
      new[] { loadNyseData, loadNasdaqData },
      (tasks) => MergeMarketData(
          from t in tasks select t.Result));

After the loadNyseData and loadNasdaqData tasks complete, the MergeMarketData method given as an argument is invoked. At that point, the tasks parameter will be an array of antecedent tasks, which are the loadNyseData and loadNasdaqData tasks.

The MergeMarketData method takes an array of StockDataCollection objects as its input. The LINQ expression from t in tasks select t.Result maps the input array of futures into a collection of StockDataCollection objects by getting the Result property of each future.


After the market data is merged, it undergoes a normalization step.

Task<StockDataCollection> normalizeMarketData =
       (t) => NormalizeData(t.Result));

The ContinueWith method creates a continuation task with a single antecedent. The continuation task gets the result value from the task referenced by the mergeMarketData variable and invokes the NormalizeData method.

Analysis and Model Creation

After the market data is normalized, the application performs an analysis step. This takes an object of type StockAnalysisCollection as input and returns an object of type MarketAnalysis, as shown in the following code.

Task<StockAnalysisCollection> analyzeMarketData =
      (t) => AnalyzeData(t.Result));

Task<MarketModel> modelMarketData =
      (t) => RunModel(t.Result));

Processing Historical Data

The application also creates a model of historical data. The steps that create the tasks are similar to those for the current market data. However, because these steps are performed by tasks, they may be run in parallel if the hardware resources allow it.

Comparing Models

Here is the code that compares the two models.

Task<MarketRecommendation> compareModels =
    factory.ContinueWhenAll<MarketModel, MarketRecommendation>(
        new[] { modelMarketData, modelHistoricalData },
        (tasks) => CompareModels(
            from t in tasks select t.Result));

The "compare models" step compares the current and historical market models and produces the final result.

View and View Model

The Adatum Dashboard is a GUI-based application that also uses the Model-View-ViewModel (MVVM) pattern. It breaks the parallel computation into subtasks whose status can be independently seen from the user interface. For more information about how the dashboard application interacts with the view model and view, see the section, "Model-View-ViewModel," in Appendix A, "Adapting Object-Oriented Patterns."


So far, you've seen some of the most common ways to use futures and continuation tasks to create tasks. This section describes some other ways to use them.

Canceling Futures and Continuation Tasks

There are several ways to cancel futures and continuation tasks. You can handle cancellation entirely from within the task, as the Adatum Dashboard does, or you can pass cancellation tokens when the tasks are created.

The Adatum Dashboard application supports cancellation from the user interface. It does this by calling the Cancel method of the CancellationTokenSource class. This sets the IsCancellationRequested property of the cancellation token to true.

The application checks for this condition at various checkpoints. If a cancellation has been requested, the operation is canceled. For a code example, see the section, "Canceling a Task," in Chapter 3, "Parallel Tasks."

Continue When "At Least One" Antecedent Completes

It's possible to invoke a continuation task when the first of multiple antecedents completes. To do this, use the Task.Factory object's ContinueWhenAny method. The ContinueWhenAny method is useful when the result of any of the tasks will do. For example, you may have an application where each task queries a Web service that gives the local weather. The application returns the first answer it receives to the user.

Using .NET Asynchronous Calls with Futures

Tasks are similar in some ways to asynchronous methods that use the .NET Asynchronous Programming Model (APM) pattern and the IAsyncResult interface. In fact, tasks in .NET Framework 4 are IAsyncResult objects. They implement this interface. This allows you to use the Task class when you implement the APM pattern.

You can convert a pair of begin/end methods that use IAsyncResult into a task. To do this, use the Task.Factory object's FromAsync method.

In general, tasks can be easier to use than other implementations of IAsyncResult because futures rethrow exceptions when the result is requested.

Removing Bottlenecks

The idea of a critical path is familiar from project management. A "path" is any sequence of tasks from the beginning of the work to the end result. A task graph may contain more than one path. For example, look at the task graph that is shown in Figure 2. You can see that there are three paths, beginning with "Load NYSE," "Load Nasdaq," and "Load Fed Historical Data" tasks. Each path ends with the "Compare" task.

The duration of a path is the sum of the execution time for each task in the path. The critical path is the path with the longest duration. The amount of time needed to calculate the end result depends only on the critical path. As long as there are enough resources (that is, available cores), the noncritical paths don't affect the overall execution time.

If you want to make your task graph run faster, you need to find a way to reduce the duration of the critical path. To do this, you can organize the work more efficiently. You can break down the slowest tasks into additional tasks, which can then execute in parallel. You can also modify a particularly time-consuming task so that it executes in parallel internally using any of the patterns that are described in this book.

The Adatum Dashboard example doesn't offer much opportunity for breaking down the slowest tasks into additional tasks that execute in parallel. This is because the paths are linear. However, you can use the Parallel Loops and Parallel Aggregation patterns to exploit more of the potential parallelism within each of the Analyze tasks (see Figure 2) if they take the most time. The task graph remains unchanged, but the tasks within it are now also parallelized.

Modifying the Graph at Run Time

The code in the financial program's analysis engine creates a static task graph. In other words, the graph of task dependencies is reflected directly in the code. By reading the implementation of the analysis engine, you can determine that there are a fixed number of tasks with a fixed set of dependencies among them.

The extension of the analysis tasks in the UI layer is an example of dynamic task creation. The UI augments the graph of tasks by adding continuation tasks programmatically, outside of the context where these tasks were originally created.

Dynamically created tasks are also a way to structure algorithms used for sorting, searching, and graph traversal. For examples, see chapter 6, "Dynamic Task Parallelism."

Design Notes

There are several ideas behind the design of the Adatum Dashboard application.

Decomposition into Futures and Continuation Tasks

The first design decision is the most obvious one: the Adatum Dashboard introduces parallelism by means of futures and continuation tasks. This makes sense because the problem space could be decomposed into operations with well-defined inputs and outputs.

Functional Style

There are implicit and explicit approaches to synchronizing data between tasks. In this chapter, the examples used an explicit approach. Data is passed between tasks as parameters, which makes the data dependencies very obvious to the programmer. Alternatively, as you saw in Chapter 3, "Parallel Tasks," it's also possible for tasks to communicate with side effects that modify shared data structures. In this case, you rely on the tasks to use control dependencies that block appropriately. However, in general, explicit data flow is less prone to error that implied data flow.

You can see this by analogy. In principle, there's no need for a programming language to support methods with return values. Programmers can always use methods without return values and perform updates on shared global variables as a way of communicating the results of a computation to other components of an application. However, in practice, using return values is considered to be a much less error-prone way to write programs. Programmers tend to make more mistakes with global variables than with return values.

Similarly, futures (tasks that return values) can reduce the possibility of error in a parallel program as compared to tasks that communicate results by modifying shared global state. In addition, tasks that return values can often require less synchronization than tasks that globally access state variables, and they are much easier to understand.

Futures also promote a natural kind of data isolation, similar to what is found in functional programming, which relies on operations that communicate with input and output values. Functional programs are very easy to adapt to multicore environments. In general, futures should only communicate with the outside world by means of their return values. It's also a good practice to use immutable types for return values.

Related Patterns

There are a number of patterns that have some similarities to the Futures pattern, but they also have some important differences. This section provides a brief comparison.

Pipeline Pattern

The Pipeline pattern is described in Chapter 7, "Pipelines." It differs in several important respects from a task graph. The pipeline focuses on data flow by means of queues (message buffers), instead of task dependencies. In a pipeline, the same task is executed on multiple data items.

Master/Worker Pattern

Tasks within the Master/Worker pattern have a child/parent relationship instead of the antecedent/dependent relationship that continuation tasks have. The master task creates the worker tasks, passes data to them, and waits for a result to be returned. Typically, worker tasks all execute the same computation against different data. The implementation of parallel loops in .NET Framework 4 uses the Master/Worker pattern internally.

Dynamic Task Parallelism Pattern

This is also known as the Divide and Conquer pattern and is the subject of Chapter 6, "Dynamic Task Parallelism." Dynamic task parallelism creates trees of tasks on the fly in a manner similar to recursion. If futures are asynchronous functions, dynamic task parallelism produces asynchronous recursive functions.

Discrete Event Pattern

The Discrete Event pattern focuses on sending messages between tasks. There is no limitation on the number of events a task raises or when it raises them. Events can also pass between tasks in either direction; there is no antecedent/dependency relationship. The Discrete Event pattern can be used to implement a task graph by placing additional restrictions on it.


  1. Suppose you parallelize the following sequential code using futures in the style of the first example in the section, "The Basics."
    var b = F1(a);  var d = F2(c);  var e = F3(b,d);  var f = F4(e);  var g = F5(e); var h = F6(f,g);

    Draw the task graph. In order to achieve the greatest degree of concurrency, what is the minimum number of futures you must define? What is the largest number of these futures that can be running at the same time?

  2. Modify the BasicFutures sample from CodePlex so that one of the futures throws an exception. What should happen? Observe the behavior when you execute the modified sample.

Further Reading

Leijen describes the motivation for including futures in the Task Parallel Library (TPL) and has references to other work, especially in functional languages. The NModel framework provides a C# library of immutable collection types including set, bag, sequence, and map.

D. Leijen, W. Schulte, and S. Burckhardt. "The Design of a Task Parallel Library." S. Arora and G.T. Leavens, editors, OOP-SLA 2009: Proceedings of the 24th Annual ACM SIGPLAN Conference on Object-Oriented Programming, Systems, Languages, and Applications, pages 227–242. ACM, 2009.

NModel software. 2008. http://nmodel.codeplex.com/

© 2015 Microsoft