Export (0) Print
Expand All

5: Futures

In Chapter 3, "Parallel Tasks," you saw how the Parallel Task pattern allows you to fork the flow of control in a program. In this chapter, you'll see how control flow and data flow can be integrated with the Futures pattern.

A future is a stand-in for a computational result that is initially unknown but becomes available at a later time. The process of calculating the result can occur in parallel with other computations. The Futures pattern integrates task parallelism with the familiar world of arguments and return values.

Futures are asynchronous functions.

Gg663533.note(en-us,PandP.10).gifNote:
Don’t confuse thetask-based futures in this chapter with other Future pattern implementations such as the std::future implementation in the Standard Template Library (STL) that has been incorporated into the C++0x working paper.

Futures express the concept of potential parallelism that was introduced in Chapter 1, "Introduction." Decomposing a sequential operation with futures can result in faster execution if hardware resources are available for parallel execution. However, if all cores are otherwise occupied, futures will be evaluated without parallelism.

You can think of a future as a task that returns a value. Instead of explicitly waiting for the task to complete, using a method such as wait, you simply ask the task for its result when you are ready to use it. If the task has already finished, its result is waiting for you and is immediately returned. If the task is running but has not yet finished, the thread that needs the result blocks until the result value becomes available. (While the thread is blocked, the core is available for other work.) If the task hasn't started yet, the pending task may be executed in the current thread context.

The Parallel Patterns Library (PPL) makes it very easy to use the Futures pattern. Here is a minimal futures implementation that illustrates how futures work.

template <class T> 
class Future
{
  private:
    single_assignment<T> m_val;
    task_group m_tg; 

  public:
    template <class Func>
    Future(Func f) 
    {
        m_tg.run([f, this]() 
        {
          send(m_val, f());
        });
    }

    T Result() 
    {
       m_tg.wait();
       return receive(&m_val);
    }
};
Gg663533.note(en-us,PandP.10).gifNote:
This implementation of a Future class omits features such as the ability to rethrow exceptions when you call the Result method multiple times. You can use this implementation in your own applications, but you should be aware that it is not meant to be completely full-featured.

In this example, each new instance of the Future<T> class creates a task group and uses the task group’s run method to add a new task to that task group. The work function of the new task is an argument to the constructor of the Future<T> class. The work function returns a value of type T.

You can easily implement futures using task groups.

Gg663533.note(en-us,PandP.10).gifNote:
The single_assignment class that is used in the implementation of the Future class is a type of messaging buffer. The send and receive functions allow for concurrency-safe communication of a single data value. For more information about messaging buffers, see Chapter 7, "Pipelines."

The Futures pattern discussed in this chapter is closely related to what is sometimes known as a task graph. When futures provide results that are the inputs to other futures, this can be seen as a directed graph. The nodes are tasks, and the arcs are values that act as inputs and outputs of the tasks.

Gg663533.note(en-us,PandP.10).gifNote:
Be careful not to confuse futures with pipelines. As you will see in Chapter 7, "Pipelines," pipeline tasks are also nodes of a directed graph, but the arcs that connect stages of the pipeline are concurrent queues that convey a series of values, just as an assembly line or data stream does. In contrast, with futures, nodes of the task graph are connected by singleton values, similar to arguments and return values.

The Basics

When you think about the Parallel Task pattern described in Chapter 3, you see that, in many cases, the purpose of a task is to calculate a result. In other words, asynchronous operations often act like functions with return values. Of course, tasks can also do other things, such as reordering values in an array, but calculating new values is common enough to warrant a pattern tailored to it. It's also much easier to reason about pure functions, which don't have side effects and therefore exist purely for their results. This simplicity becomes very useful as the number of cores becomes large.

Futures

The following example is from the body of a sequential method.

  int a = 22;

  int b = F1(a); 
  int c = F2(a); 
  int d = F3(c); 
  int f = F4(b, d); 
  return f;

Suppose that F1, F2, F3, and F4 are processor-intensive functions that communicate with one another using arguments and return values instead of reading and updating shared state variables.

Suppose, also, that you want to distribute the work of these functions across available cores, and you want your code to run correctly no matter how many cores are available. When you look at the inputs and outputs, you can see that F1 can run in parallel with F2 and F3 but that F3 can't start until after F2 finishes. How do you know this? The possible orderings become apparent when you visualize the function calls as a graph. Figure 1 illustrates this.

Gg663533.912F0D7D59E6BA62C2675F0E7D91CB5F(en-us,PandP.10).png

Figure 1

A task graph for calculating f

The nodes of the graph are the functions F1, F2, F3, and F4. The incoming arrows for each node are the inputs required by the function, and the outgoing arrows are values calculated by each function. It's easy to see that F1 and F2 can run at the same time but that F3 must follow F2.

Here's an example that shows how to create futures for this example. For simplicity, the code assumes that the values being calculated are integers and that the value of variable a has already been supplied, perhaps as an argument to the current method.

  int a = 22;

  Future<int> futureB([a](){ return F1(a); });
  int c = F2(a);
  int d = F3(c);
  int f = F4(futureB.Result(), d);
  return f;

The Result method either returns a precalculated value immediately or waits until the value becomes available.

This code creates a future that begins to asynchronously calculate the value of F1(a). On a multicore system, F1 will be able to run in parallel with the current thread. This means that F2 can begin executing without waiting for F1. The function F4 will execute as soon as the data it needs becomes available. It doesn't matter whether F1 or F3 finishes first, because the results of both functions are required before F4 can be invoked. (Recall that the Result method does not return until the future's value is available.) Note that the calls to F2, F3, and F4 do not need to be wrapped inside of a future because a single additional asynchronous operation is all that is needed to take advantage of the parallelism of this example.

Of course, you could equivalently have put F2 and F3 inside of a future, as shown here.

  int a = 22;

  Future<int> futureD([a](){ return F3(F2(a)); });
  int b = F1(a);
  int f = F4(b, futureD.Result());
  return f;

It doesn't matter which branch of the task graph shown in the figure runs asynchronously.

An important point of this example is that exceptions that occur during the execution of a future are thrown by the Result method. This makes exception handling easy. You can think of futures as either returning a result or throwing an exception. Conceptually, this is very similar to the way any C++ function works. Here is another example of exception handling.

Futures, when they are based on tasks, defer exceptions until the Result method is called.

  int a = 22;

  Future<int> futureD([a](){ return F3(F2error(a)); });
  int b = F1(a);
  try
  {
    int f = F4(b, futureD.Result());
    printf("  Result = %d\n", f);
  }
  catch (exception& e)
  {
    printf("  Exception '%s' is caught as expected.\n", 
           e.what());
  }

If an exception of type exception were thrown in F2 or F3, it would be deferred and rethrown when the Result method of futureD is called. In this example, the invocation of the Result method occurs within a try block, which means that the exception can be handled in the corresponding catch block.

Example: The Adatum Financial Dashboard

Here's an example of how the Futures pattern can be used in an application. The example shows how you can run computationally intensive operations in parallel in an application that uses a graphical user interface (GUI).

Adatum is a financial services firm that provides a financial dashboard application to its employees. The application, known as the Adatum Dashboard, allows employees to perform analyses of financial markets. The dashboard application runs on an employee's desktop workstation. The Adatum Dashboard analyzes historical data instead of a stream of real-time price data. The analysis it performs is computationally intensive, but there is also some I/O latency because the Adatum Dashboard application collects input data from several sources over the network.

After the application has the market data, it merges the datasets together. The application normalizes the merged market data and then performs an analysis step. After the analysis, it creates a market model. It also performs these same steps for historical market data from the Federal Reserve System. After the current and historical models are ready, the application compares the two models and makes a market recommendation of "buy," "sell," or "hold." You can visualize these steps as a graph. Figure 2 illustrates this.

Gg663533.7E223DD036E851A77B517F2A79B5306E(en-us,PandP.10).png

Figure 2

Adatum Dashboard tasks

The tasks in this diagram communicate by specific types of business objects. These are implemented as classes in the Adatum Dashboard application.

You can download the source code for the Adatum Dashboard application from the CodePlex site at http://parallelpatternscpp.codeplex.com/ in the Chapter5\A-Dash project. The application consists of three parts: the business object definitions, an analysis engine, and the user interface.

The Business Objects

The Adatum Dashboard uses immutable data types. Objects of these types cannot be modified after they are created, which makes them well suited to parallel applications.

The StockDataCollection type represents a time series of closing prices for a group of securities. You can think of this as a dictionary indexed by a stock symbol. Conceptually, the values are arrays of prices for each security. You can merge StockDataCollection values as long as the stock symbols don't overlap. The result of the merge operation is a new StockDataCollection value that contains the time series of the inputs.

The StockAnalysisCollection type is the result of the analysis step. Similarly, the MarketModel and MarketRecommendation classes are the outputs of the modeling and the comparison phases of the application. The MarketRecommendation class has a data accessor method that returns a "buy, hold, or sell" decision.

The Analysis Engine

The Adatum Dashboard's AnalysisEngine class produces a market recommendation from the market data it receives.

The sequential process is shown in the following code. This code differs slightly from the online sample source; details of cancellation handling have been omitted for clarity.

MarketRecommendation 
DoAnalysisSequential(AnalysisEngineState& engineState) const
{
  engineState.Reset();
  engineState.IsRunning();
  vector<StockDataCollection> stockDatasets;
  vector<MarketModel> models;

  // Current market data tasks

  stockDatasets.push_back(LoadNyseData());
  stockDatasets.push_back(LoadNasdaqData());
  StockDataCollection mergedMarketData = 
                              MergeMarketData(stockDatasets);
  StockDataCollection normalizedMarketData = 
                              NormalizeData(mergedMarketData);
  StockAnalysisCollection analyzedStockData = 
                               AnalyzeData(normalizedMarketData);
  models.push_back(RunModel(analyzedStockData));

  // Historical data tasks

  StockDataCollection fedHistoricalData = 
                                     LoadFedHistoricalData();
  StockDataCollection normalizedHistoricalData = 
                              NormalizeData(fedHistoricalData);
  StockAnalysisCollection analyzedHistoricalData = 
                          AnalyzeData(normalizedHistoricalData);
  models.push_back(RunModel(analyzedHistoricalData));

  // Compare results

  MarketRecommendation result = CompareModels(models);
  engineState.SetMarketRecommendation(result.GetValue());
  engineState.IsStopped();
  return result;
}

The final result of the computation is a MarketRecommendation object. Each of the method calls returns data that becomes the input to the operation that invokes it. When you use method invocations in this way, you are limited to sequential execution.

The parallel version uses futures for each of the operational steps. Here's the code. This code differs slightly from the online sample source; details of cancellation handling have been omitted for clarity.

MarketRecommendation 
DoAnalysisParallel(AnalysisEngineState& engineState) const
{
  engineState.Reset();
  engineState.IsRunning();

  // Current market data tasks

  Future<StockDataCollection> future1(
    [this, &engineState]()->StockDataCollection
    { 
      scoped_oversubcription_token oversubscribeForIO;
      return LoadNyseData();
    });

  Future<StockDataCollection> future2(
    [this, &engineState]()->StockDataCollection
    { 
      scoped_oversubcription_token oversubscribeForIO; 
      return LoadNasdaqData();
    });

  Future<StockDataCollection> future3(
    [this, &engineState, &future1, &future2]()
      ->StockDataCollection
    {
      vector<StockDataCollection> stockDatasets;
      stockDatasets.push_back(future1.Result());
      stockDatasets.push_back(future2.Result());
      return this->MergeMarketData(stockDatasets);
    });

  Future<StockDataCollection> future4(
    [this, &engineState, &future3]()->StockDataCollection
    {
      return NormalizeData(future3.Result());
    });

  Future< StockAnalysisCollection> future5(
    [this, &engineState, &future4]()
      ->StockAnalysisCollection
    {
      return AnalyzeData(future4.Result());
    });

  Future< MarketModel> future6 = Future<MarketModel>(
    [this, &engineState, &future5]()->MarketModel
    {
      return RunModel(future5.Result());
    });

  // Historical data tasks

  Future<StockDataCollection> future7(
    [this, &engineState]()->StockDataCollection
    { 
      scoped_oversubcription_token oversubscribeForIO;
      return LoadFedHistoricalData();
    });

  Future<StockDataCollection> future8(
    [this, &engineState, &future7]()->StockDataCollection
    {
      return NormalizeData(future7.Result());
    });

  Future<StockAnalysisCollection> future9(
    [this, &engineState, &future8]()->StockAnalysisCollection
    {
      return AnalyzeData(future8.Result());
    });

  Future<MarketModel> future10 = Future<MarketModel>(
    [this, &engineState, &future9]()->MarketModel 
    {
      return RunModel(future9.Result());
    });

  // Compare results

  vector<MarketModel> models;
  models.push_back(future6.Result());
  models.push_back(future10.Result());
  MarketRecommendation result = CompareModels(models);

  engineState.SetMarketRecommendation(result.GetValue());
  engineState.IsStopped();
  return result;
} 

The parallel version, provided by the DoAnalysisParallel method, is similar to the sequential version, except that the synchronous method calls have been replaced with futures. On a single-core machine the performance of the parallel version will be approximately the same as the sequential version. On a computer with many cores, the futures will all execute in parallel, constrained by the data dependencies that exist among them.

Several of the futures are long-running I/O-intensive tasks that use a small percentage of a core’s processing power. For these futures, the code uses the scoped_oversubscription_token class to signal that the task scheduler can use the resources that were allocated to the current task to perform another task concurrently.

Variations

So far, you've seen some of the most common ways to use futures to create tasks. This section describes some other ways to use them.

Canceling Futures

There are several ways to implement a cancellation model using the Futures pattern.

By default, if you enter the Result method of a future from within a task context, canceling that task’s task group before the Result method exits will implicitly cause the task group in the Future instance to be canceled. See the section, "Canceling a Task," in Chapter 3, "Parallel Tasks" for more information about the propagation of cancellation across task groups.

In addition to implicitly propagated task cancellation, you can also use messaging buffers as a way to implement an explicit cancellation approach for futures. A cancellation strategy based on messaging buffers is shown in the ImagePipeline example in Chapter 7, "Pipelines."

Removing Bottlenecks

The idea of a critical path is familiar from project management. A "path" is any sequence of tasks from the beginning of the work to the end result. A task graph may contain more than one path. For example, look at the task graph that is shown in Figure 2. You can see that there are three paths, beginning with "Load NYSE," "Load Nasdaq," and "Load Fed Historical Data" tasks. Each path ends with the "Compare" task.

The duration of a path is the sum of the execution time for each task in the path. The critical path is the path with the longest duration. The amount of time needed to calculate the end result depends only on the critical path. As long as there are enough resources (that is, available cores), the noncritical paths don't affect the overall execution time.

If you want to make your task graph run faster, you need to find a way to reduce the duration of the critical path. To do this, you can organize the work more efficiently. You can break down the slowest tasks into additional tasks, which can then execute in parallel. You can also modify a particularly time-consuming task so that it executes in parallel internally using any of the patterns that are described in this book.

The Adatum Dashboard example doesn't offer much opportunity for breaking down the slowest tasks into additional tasks that execute in parallel. This is because the paths are linear. However, you can use the Parallel Loops and Parallel Aggregation patterns to exploit more of the potential parallelism within each of the Analyze tasks if they take the most time. The task graph remains unchanged, but the tasks within it are now also parallelized. The Parallel Loops pattern is discussed in Chapter 2, "Parallel Loops," and the Parallel Aggregation pattern is discussed in Chapter 4, "Parallel Aggregation."

Modifying the Graph at Run Time

The code in the financial program's analysis engine creates a static task graph. In other words, the graph of task dependencies is reflected directly in the code. By reading the implementation of the analysis engine, you can determine that there are a fixed number of tasks with a fixed set of dependencies among them.

However, you can also create dependencies between futures dynamically. For example, if you wanted to update the UI after each of the futures in the Adatum Dashboard example completed in order to show the application's progress, you could create tasks that wait on the futures that make up the task graph of the Adatum Dashboard example. In other words, you can call the Result method of the Future class as many times as needed. With each invocation, the calling context will be suspended until the values have been computed. Making calls to the Result method can occur outside of the context where the futures were originally created.

Dynamically created tasks are also a way to structure algorithms used for sorting, searching, and graph traversal. For examples, see chapter 6, "Dynamic Task Parallelism."

Design Notes

There are several ideas behind the design of the Adatum Dashboard application.

Decomposition into Futures

The first design decision is the most obvious one: the Adatum Dashboard introduces parallelism by means of futures. This makes sense because the problem space could be decomposed into operations with well-defined inputs and outputs.

Functional Style

There are implicit and explicit approaches to synchronizing data between tasks. In this chapter, the examples use an explicit approach. Data is passed between tasks as parameters, which makes the data dependencies very obvious to the programmer. Alternatively, as you saw in Chapter 3, "Parallel Tasks," it's possible to use an implicit approach. In Chapter 3, tasks communicate with side effects that modify shared data structures. In this case, you rely on the tasks to use control dependencies that block appropriately. However, in general, explicit data flow is less prone to error that implied data flow.

You can see this by analogy. In principle, there's no need for a programming language to support methods with return values. Programmers can always use methods without return values and perform updates on shared global variables as a way of communicating the results of a computation to other components of an application. However, in practice, using return values is considered to be a much less error-prone way to write programs. Programmers tend to make more mistakes with global variables than with return values.

Similarly, futures (tasks that return values) can reduce the possibility of error in a parallel program as compared to tasks that communicate results by modifying shared global state. In addition, tasks that return values can often require less synchronization than tasks that globally access state variables, and they are much easier to understand.

Futures also promote a natural kind of data isolation similar to what is found in functional programming, which relies on operations that communicate with input and output values. Functional programs are very easy to adapt to multicore environments. In general, futures should only communicate with the outside world by means of their return values. It's also a good practice to use immutable types for return values.

Applications that use arguments and return values to communicate among tasks scale well as the number of cores increases.

Related Patterns

There are a number of patterns that have some similarities to the Futures pattern, but they also have some important differences. This section provides a brief comparison.

Pipeline Pattern

The Pipeline pattern is described in Chapter 7, "Pipelines." It differs in several important respects from a task graph. The pipeline focuses on data flow by means of queues (messaging buffers), instead of task dependencies. In a pipeline, the same task is executed on multiple data items.

Master/Worker Pattern

Tasks within the Master/Worker pattern have a parent/child relationship. The master task creates the worker tasks, passes data to them, and waits for a result to be returned. Typically, worker tasks all execute the same computation against different data. The implementation of parallel loops in PPL uses the Master/Worker pattern internally.

Dynamic Task Parallelism Pattern

The Dynamic Task Parallelism pattern is also known as the Divide and Conquer pattern. It is the subject of Chapter 6, "Dynamic Task Parallelism." Dynamic task parallelism creates trees of tasks on the fly in a manner similar to recursion. If futures are asynchronous functions, dynamic task parallelism produces asynchronous recursive functions.

Discrete Event Pattern

The Discrete Event pattern focuses on sending messages between tasks. There is no limitation on the number of events a task raises or when it raises them. Events can also pass between tasks in either direction; there is no antecedent/dependency relationship. The Discrete Event pattern can be used to implement a task graph by placing additional restrictions on it.

Exercises

  1. Suppose you use futures in the style of the first example, in the section "The Basics," to parallelize the following sequential code.
    auto b = F1(a);  auto d = F2(c);  auto e = F3(b,d);
    auto f = F4(e);  auto g = F5(e);  auto h = F6(f,g);
    

    Draw the task graph. In order to achieve the greatest degree of concurrency, what is the minimum number of futures you must define? What is the largest number of these futures that can be running at the same time?

  2. Modify the BasicFutures sample from the CodePlex at http://parallelpatternscpp.codeplex.com/ so that one of the futures throws an exception. What should happen? Observe the behavior when you execute the modified sample.

Last built: March 9, 2012

Show:
© 2014 Microsoft