This topic shows how to implement futures in your application. The topic demonstrates how to combine existing functionality in the Concurrency Runtime into something that does more.
Important
|
|---|
|
This topic illustrates the concept of futures for demonstration purposes. We recommend that you use std::future or concurrency::task when you require an asynchronous task that computes a value for later use. |
A task is a computation that can be decomposed into additional, more fine-grained, computations. A future is an asynchronous task that computes a value for later use.
To implement futures, this topic defines the async_future class. The async_future class uses these components of the Concurrency Runtime: the concurrency::task_group class and the concurrency::single_assignment class. The async_future class uses the task_group class to compute a value asynchronously and the single_assignment class to store the result of the computation. The constructor of the async_future class takes a work function that computes the result, and the get method retrieves the result.
To implement the async_future class
-
Declare a template class named async_future that is parameterized on the type of the resulting computation. Add public and private sections to this class.
template <typename T> class async_future { public: private: };
-
In the private section of the async_future class, declare a task_group and a single_assignment data member.
// Executes the asynchronous work function. task_group _tasks; // Stores the result of the asynchronous work function. single_assignment<T> _value;
-
In the public section of the async_future class, implement the constructor. The constructor is a template that is parameterized on the work function that computes the result. The constructor asynchronously executes the work function in the task_group data member and uses the concurrency::send function to write the result to the single_assignment data member.
template <class Functor> explicit async_future(Functor&& fn) { // Execute the work function in a task group and send the result // to the single_assignment object. _tasks.run([fn, this]() { send(_value, fn()); }); }
-
In the public section of the async_future class, implement the destructor. The destructor waits for the task to finish.
~async_future() { // Wait for the task to finish. _tasks.wait(); } -
In the public section of the async_future class, implement the get method. This method uses the concurrency::receive function to retrieve the result of the work function.
// Retrieves the result of the work function. // This method blocks if the async_future object is still // computing the value. T get() { return receive(_value); }
The following example shows the complete async_future class and an example of its usage. The wmain function creates a std::vector object that contains 10,000 random integer values. It then uses async_future objects to find the smallest and largest values that are contained in the vector object.
// futures.cpp // compile with: /EHsc #include <ppl.h> #include <agents.h> #include <vector> #include <algorithm> #include <iostream> #include <numeric> #include <random> using namespace concurrency; using namespace std; template <typename T> class async_future { public: template <class Functor> explicit async_future(Functor&& fn) { // Execute the work function in a task group and send the result // to the single_assignment object. _tasks.run([fn, this]() { send(_value, fn()); }); } ~async_future() { // Wait for the task to finish. _tasks.wait(); } // Retrieves the result of the work function. // This method blocks if the async_future object is still // computing the value. T get() { return receive(_value); } private: // Executes the asynchronous work function. task_group _tasks; // Stores the result of the asynchronous work function. single_assignment<T> _value; }; int wmain() { // Create a vector of 10000 integers, where each element // is between 0 and 9999. mt19937 gen(2); vector<int> values(10000); generate(begin(values), end(values), [&gen]{ return gen()%10000; }); // Create a async_future object that finds the smallest value in the // vector. async_future<int> min_value([&]() -> int { int smallest = INT_MAX; for_each(begin(values), end(values), [&](int value) { if (value < smallest) { smallest = value; } }); return smallest; }); // Create a async_future object that finds the largest value in the // vector. async_future<int> max_value([&]() -> int { int largest = INT_MIN; for_each(begin(values), end(values), [&](int value) { if (value > largest) { largest = value; } }); return largest; }); // Calculate the average value of the vector while the async_future objects // work in the background. int sum = accumulate(begin(values), end(values), 0); int average = sum / values.size(); // Print the smallest, largest, and average values. wcout << L"smallest: " << min_value.get() << endl << L"largest: " << max_value.get() << endl << L"average: " << average << endl; }
To extend the async_future class to handle exceptions that are thrown by the work function, modify the async_future::get method to call the concurrency::task_group::wait method. The task_group::wait method throws any exceptions that were generated by the work function.
The following example shows the modified version of the async_future class. The wmain function uses a try-catch block to print the result of the async_future object or to print the value of the exception that is generated by the work function.
// futures-with-eh.cpp // compile with: /EHsc #include <ppl.h> #include <agents.h> #include <vector> #include <algorithm> #include <iostream> using namespace concurrency; using namespace std; template <typename T> class async_future { public: template <class Functor> explicit async_future(Functor&& fn) { // Execute the work function in a task group and send the result // to the single_assignment object. _tasks.run([fn, this]() { send(_value, fn()); }); } ~async_future() { // Wait for the task to finish. _tasks.wait(); } // Retrieves the result of the work function. // This method blocks if the async_future object is still // computing the value. T get() { // Wait for the task to finish. // The wait method throws any exceptions that were generated // by the work function. _tasks.wait(); // Return the result of the computation. return receive(_value); } private: // Executes the asynchronous work function. task_group _tasks; // Stores the result of the asynchronous work function. single_assignment<T> _value; }; int wmain() { // For illustration, create a async_future with a work // function that throws an exception. async_future<int> f([]() -> int { throw exception("error"); }); // Try to read from the async_future object. try { int value = f.get(); wcout << L"f contains value: " << value << endl; } catch (const exception& e) { wcout << L"caught exception: " << e.what() << endl; } }
This example produces the following output:
caught exception: error
For more information about the exception handling model in the Concurrency Runtime, see Exception Handling in the Concurrency Runtime.
Important