March 2012

Volume 27 Number 03

C++ - New Standard Concurrency Features in Visual C++ 11

By Diego Dagum | March 2012

The latest C++ iteration, known as C++11 and approved by the International Organization for Standardization (ISO) in the past year, formalizes a new set of libraries and a few reserved words to deal with concurrency. Many developers have used concurrency in C++ before, but always through a third-party library—often directly exposing OS APIs.

Herb Sutter announced in December 2004 that the “free performance lunch” was over in the sense that CPU manufacturers were prevented from shipping faster CPUs by physical power consumption and increasing heat reasons. This led to the current, mainstream multicore era, a new reality to which C++—the standard one—has just made an important leap to adapt.

The rest of this article is organized in two main sections and smaller subsections. The first main section, starting with Parallel Execution, covers technologies that allow applications to run inde­pendent or semi-independent activities in parallel. The second main section, starting with Syncing up Concurrent Execution, explores mechanisms for synchronizing the way these activities handle data, thus avoiding race conditions.

This article is based on features included in the upcoming version of Visual C++ (for now, called Visual C++ 11). A few of them are already available in the current version, Visual C++ 2010. Although not a guide to model parallel algorithms, nor an exhaustive documentation about all the available options, this article is a solid introduction to new C++11 concurrency features.

Parallel Execution

When you model processes and design algorithms over data, there’s a natural tendency to specify them in a sequence of steps. As long as performance is within acceptable bounds, this is the most recommendable schema because it’s typically easier to understand—a requirement for maintainable code bases.

When performance becomes a worrisome factor, a classic initial attempt to overcome the situation is to optimize the sequential algorithm in order to reduce the consumed CPU cycles. This can be done until you come to a point where no further optimizations are available—or they’re hard to achieve. Then the time to split the sequential series of steps into activities of simultaneous occurrence has come.

In the first section you’ll learn about the following:

  • Asynchronous tasks: those smaller portions of the original algorithm only linked by the data they produce or consume.
  • Threads: units of execution administrated by the runtime environment. They relate to tasks in the sense that tasks are run on threads.
  • Thread internals: thread-bound variables, exceptions propagated from threads and so on.

Asynchronous Tasks

In the companion code to this article, you’ll find a project called Sequential Case, as shown in Figure 1.

Figure 1 Sequential Case Code

int a, b, c;
int calculateA()
{
  return a+a*b;
}
int calculateB()
{
  return a*(a+a*(a+1));
}
int calculateC()
{
  return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
  getUserData(); // initializes a and b
  c = calculateA() * (calculateB() + calculateC());
  showResult();
}

The main function asks the user for some data and then submits that data to three functions: calculateA, calculateB and calculateC. The results are later combined to produce some output information for the user.

The calculating functions in the companion material are coded in a way such that a random delay between one and three seconds is introduced in each. Considering that these steps are executed sequentially, this leads to an overall execution time—once the input data is entered—of nine seconds in the worst-case scenario. You can try this code out by pressing F5 and running the sample.

So I need to revise the execution sequence and find steps to be performed concurrently. As these functions are independent, I can execute them in parallel by using the async function:

int main(int argc, char *argv[])

{

  getUserData();

  future<int> f1 = async(calculateB), f2 = async(calculateC);

  c = (calculateA() + f1.get()) * f2.get();

  showResult();

}

I’ve introduced two concepts here: async and future, both defined in the <future> header and the std namespace. The first one receives a function, a lambda or a function object (functor) and returns a future. You can understand the concept of a future as the placeholder for an eventual result. Which result? The one returned by the function called asynchronously.

At some point, I’ll need the results of these parallel-running functions. Calling the get method on each future blocks the execution until the value is available.

You can test and compare the revised code with the sequential case by running the AsyncTasks project in the companion sample. The worst-case delay of this modification is about three seconds versus nine seconds for the sequential version.

This is a lightweight programming model that releases the developer from the duty of creating threads. However, you can specify threading policies, but I won’t cover those here.

Threads

The asynchronous task model presented in the previous section might suffice in some given scenarios, but if you need a deeper handling and control of the execution of threads, C++11 comes with the thread class, declared in the <thread> header and located in the std namespace.

Despite being a more complex programming model, threads offer better methods for synchronization and coordination, allowing them to yield execution to another thread and wait for a determined amount of time or until another thread is finished before continuing.

In the following example (available in the Threads project of the companion code), I have a lambda function, which, given an integer argument, prints its multiples of less than 100,000 to the console:

auto multiple_finder = [](int n) {

  for (int i = 0; i < 100000; i++)

    if (i%n==0)

      cout << i << " is a multiple of " << n << endl;

};

int main(int argc, char *argv[])

{

  thread th(multiple_finder, 23456);

  multiple_finder(34567);

  th.join();

}

As you’ll see in later examples, the fact that I passed a lambda to the thread is circumstantial; a function or functor would’ve sufficed as well.

In the main function I run this function in two threads with different parameters. Take a look at my result (which could vary between different runs due to timings):

0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456

I might implement the example about asynchronous tasks in the previous section with threads. For this, I need to introduce the concept of a promise. A promise can be understood as a sink through which a result will be dropped when available. Where will that result come out once dropped? Each promise has an associated future.

The code shown in Figure 2, available in the Promises project of the sample code, associates three threads (instead of tasks) with promises and makes each thread call a calculate function. Compare these details with the lighter AsyncTasks version.

Figure 2 Associating Futures with Promises

typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
  p.set_value(f());
}
int main(int argc, char *argv[])
{
  getUserData();
  promise<int> p1, p2;
  future<int> f1 = p1.get_future(), f2 = p2.get_future();
  thread t1(&func2promise, calculateB, std::ref(p1)),
    t2(&func2promise, calculateC, std::ref(p2));
  c = (calculateA() + f1.get()) * f2.get();
  t1.join(); t2.join();
  showResult();
}

Thread-Bound Variables and Exceptions

In C++ you can define global variables whose scope is bound to the entire application, including threads. But relative to threads, now there’s a way to define these global variables such that every thread keeps its own copy. This concept is known as thread local storage and it’s declared as follows:

thread_local int subtotal = 0;

If the declaration is done in the scope of a function, the visibility of the variable will be narrowed to that function but each thread will keep maintaining its own static copy. That is to say, values of the variable per thread are being kept between function invocations.

Although thread_local isn’t available in Visual C++ 11, it can be simulated with a non-standard Microsoft extension:

#define  thread_local __declspec(thread)

What would happen if an exception were thrown inside a thread? There will be cases in which the exception can be caught and handled in the call stack inside the thread. But if the thread doesn’t deal with the exception, you need a way to transport the exception to the initiator thread. C++11 introduces such mechanisms.

In Figure 3, available in the companion code in the project ThreadInternals, there’s a function sum_until_element_with_threshold, which traverses a vector until it finds a specific element, summing all the elements along the way. If the sum exceeds a threshold, an exception is thrown.

Figure 3 Thread Local Storage and Thread Exceptions

thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
  unsigned threshold, exception_ptr& pExc)
{
  try{
    find_if_not(begin(v), end(v), [=](const unsigned i) -> bool {
      bool ret = (i!=element);
      sum_total+= i;
      if (sum_total>threshold)
        throw runtime_error("Sum exceeded threshold.");
      return ret;
    });
    cout << "(Thread #" << this_thread::get_id() << ") " <<
      "Sum of elements until " << element << " is found: " << sum_total << endl;
  } catch (...) {
    pExc = current_exception();
  }
}

If that happens, the exception is captured via current_exception into an exception_ptr.

The main function triggers a thread on sum_until_element_with_threshold, while calling that same function with a different parameter. When both invocations have finished (the one in the main thread and the one in the thread triggered from it), their respective exception_ptrs will be analyzed:

const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  exception_ptr pExc1, pExc2;
  scramble_vector(1000);
  thread th(sum_until_element_with_threshold, 0, THRESHOLD, ref(pExc1));
  sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
  th.join();
  dealWithExceptionIfAny(pExc1);
  dealWithExceptionIfAny(pExc2);
}

If any of these exception_ptrs come initialized—a sign that some exception happened—their exceptions are triggered back with rethrow_exception:

void dealWithExceptionIfAny(exception_ptr pExc)
{
  try
  {
    if (!(pExc==exception_ptr()))
      rethrow_exception(pExc);
    } catch (const exception& exc) {
      cout << "(Main thread) Exception received from thread: " <<
        exc.what() << endl;
  }
}

This is the result of our execution, as the sum in the second thread exceeded its threshold:

(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread: Sum exceeded threshold.

Syncing up Concurrent Execution

It would be desirable if all applications could be split into a 100 percent-independent set of asynchronous tasks. In practice this is almost never possible, as there are at least dependencies on the data that all parties concurrently handle. This section introduces new C++11 technologies to avoid race conditions.

You’ll learn about:

  • Atomic types: similar to primitive data types, but enabling thread-safe modification.
  • Mutexes and locks: elements that enable us to define thread-safe critical regions.
  • Condition variables: a way to freeze threads from execution until some criteria is satisfied.

Atomic Types

The <atomic> header introduces a series of primitive types—atomic_char, atomic_int and so on—implemented in terms of interlocking operations. Thus, these types are equivalent to their homonyms without the atomic_ prefix but with the difference that all their assignment operators (==, ++, --, +=, *= and so on) are protected from race conditions. So it won’t happen that in the midst of an assignment to these data types, another thread irrupts and changes values before we’re done.

In the following example there are two parallel threads (one being the main) looking for different elements within the same vector:

atomic_uint total_iterations;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  total_iterations = 0;
  scramble_vector(1000);
  thread th(find_element, 0);
  find_element(100);
  th.join();
  cout << total_iterations << " total iterations." << endl;
 }

When each element is found, a message from within the thread is printed, telling the position in the vector (or iteration) where the element was found:

void find_element(unsigned element)
{
  unsigned iterations = 0;
  find_if(begin(v), end(v), [=, &iterations](const unsigned i) -> bool {
    ++iterations;
    return (i==element);
  });
  total_iterations+= iterations;
  cout << "Thread #" << this_thread::get_id() << ": found after " <<
    iterations << " iterations." << endl;
}

There’s also a common variable, total_iterations, which is updated with the compounded number of iterations applied by both threads. Thus, total_iterations must be atomic to prevent both threads from updating it at the same time. In the preceding example, even if you didn’t need to print the partial number of iterations in find_element, you’d still accumulate iterations in that local variable instead of total_iterations, to avoid contention over the atomic variable.

You’ll find the preceding sample in the Atomics project in the companion code download. I ran it, getting the following:

Thread #8064: found after 168 iterations.
Thread #6948: found after 395 iterations.
563 total iterations.

Mut(ual) Ex(clusion) and Locks

The previous section depicted a particular case of mutual exclusion for writing access on primitive types. The <mutex> header defines a series of lockable classes to define critical regions. That way, you can define a mutex to establish a critical region throughout a series of functions or methods, in the sense that only one thread at a time will be able to access any member in this series by successfully locking its mutex.

A thread attempting to lock a mutex can either stay blocked until the mutex is available or just fail in the attempt. In the middle of these two extremes, the alternative timed_mutex class can stay blocked for a small interval of time before failing. Allowing lock attempts to desist helps prevent deadlocks.

A locked mutex must be explicitly unlocked for others to lock it. Failing to do so could lead to an undetermined application behavior—which could be error-prone, similar to forgetting to release dynamic memory. Forgetting to release a lock is actually much worse, because it might mean that the application can’t function properly anymore if other code keeps waiting on that lock. Fortunately, C++11 also comes with locking classes. A lock acts on a mutex, but its destructor makes sure to release it if locked.

The following code (available in the Mutex project in the code download) defines a critical region around a mutex mx:

mutex mx;
void funcA();
void funcB();
int main()
{
  thread th(funcA)
  funcB();
  th.join();
}

This mutex is used to guarantee that two functions, funcA and funcB, can run in parallel without coming together in the critical region.

The function funcA will wait, if necessary, in order to come to the critical region. In order to make it do so, you just need the simplest locking mechanism—lock_guard:

void funcA()
{
  for (int i = 0; i<3; ++i)
  {
    this_thread::sleep_for(chrono::seconds(1));
    cout << this_thread::get_id() << ": locking with wait... " << endl;
    lock_guard<mutex> lg(mx);
    ... // Do something in the critical region.
    cout << this_thread::get_id() << ": releasing lock." << endl;
  }
}

The way it’s defined, funcA should access the critical region three times. The function funcB, instead, will attempt to lock, but if the mutex is by then already locked, funcB will just wait for a second before again attempting to get access to the critical region. The mechanism it uses is unique_lock with the policy try_to_lock_t, as shown in Figure 4.

Figure 4 Lock with Wait

void funcB()
{
  int successful_attempts = 0;
  for (int i = 0; i<5; ++i)
  {
    unique_lock<mutex> ul(mx, try_to_lock_t());
    if (ul)
    {
      ++successful_attempts;
      cout << this_thread::get_id() << ": lock attempt successful." <<
        endl;
      ... // Do something in the critical region
      cout << this_thread::get_id() << ": releasing lock." << endl;
    } else {
      cout << this_thread::get_id() <<
        ": lock attempt unsuccessful. Hibernating..." << endl;
      this_thread::sleep_for(chrono::seconds(1));
    }
  }
  cout << this_thread::get_id() << ": " << successful_attempts
    << " successful attempts." << endl;
}

The way it’s defined, funcB will try up to five times to enter the critical region. Figure 5 shows the result of the execution. Out of the five attempts, funcB could only come to the critical region twice.

Figure 5 Executing the Sample Project Mutex

funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: 2 successful attempts.
funcA: locking with wait ...
funcA: lock secured ...
funcA: releasing lock.

Condition Variables

The header <condition_variable> comes with the last facility covered in this article, fundamental for those cases when coordination between threads is tied to events.

In the following example, available in project CondVar in the code download, a producer function pushes elements in a queue:

mutex mq;
condition_variable cv;
queue<int> q;
void producer()
{
  for (int i = 0;i<3;++i)
  {
    ... // Produce element
    cout << "Producer: element " << i << " queued." << endl;
    mq.lock();      q.push(i);  mq.unlock();
    cv.notify_all();
  }
}

The standard queue isn’t thread-safe, so you must make sure that nobody else is using it (that is, the consumer isn’t popping any element) when queuing.

The consumer function attempts to fetch elements from the queue when available, or it just waits for a while on the condition variable before attempting again; after two consecutive failed attempts, the consumer ends (see Figure 6).

Figure 6 Waking Up Threads Through Conditional Variables

void consumer()
{
  unique_lock<mutex> l(m);
  int failed_attempts = 0;
  while (true)
  {
    mq.lock();
    if (q.size())
    {
      int elem = q.front();
      q.pop();
      mq.unlock();
      failed_attempts = 0;
      cout << "Consumer: fetching " << elem << " from queue." << endl;
      ... // Consume elem
    } else {
      mq.unlock();
      if (++failed_attempts>1)
      {
        cout << "Consumer: too many failed attempts -> Exiting." << endl;
        break;
      } else {
        cout << "Consumer: queue not ready -> going to sleep." << endl;
        cv.wait_for(l, chrono::seconds(5));
      }
    }
  }
}

The consumer is to be awoken via notify_all by the producer every time a new element is available. That way, the producer avoids having the consumer sleep for the entire interval if elements are ready.

Figure 7shows the result of my run.

Figure 7 Synchronization with Condition Variables

Consumer: queue not ready -> going to sleep.
Producer: element 0 queued.
Consumer: fetching 0 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 1 queued.
Consumer: fetching 1 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 2 queued.
Producer: element 3 queued.
Consumer: fetching 2 from queue.
Producer: element 4 queued.
Consumer: fetching 3 from queue.
Consumer: fetching 4 from queue.
Consumer: queue not ready -> going to sleep.
Consumer: two consecutive failed attempts -> Exiting.

A Holistic View

To recap, this article has shown a conceptual panorama of mechanisms introduced in C++11 to allow parallel execution in an era where multicore environments are mainstream.

Asynchronous tasks enable a lightweight programming model to parallelize execution. The outcomes of each task can be retrieved through an associated future.

Threads offer more granularity than tasks—although they’re heavier—together with mechanisms for keeping separated copies of static variables and transporting exceptions between threads.

As parallel threads act on common data, C++11 provides resources to avoid race conditions. Atomic types enable a trusted way to ensure that data is modified by one thread at a time.

Mutexes help us define critical regions throughout the code—regions to which threads are prevented access simultaneously. Locks wrap mutexes, tying the unlocking of the latter to the lifecycle of the former.

Finally, condition variables grant more efficiency to thread synchronization, as some threads can wait for events notified by other threads.

This article hasn’t covered all the many ways to configure and use each of these features, but the reader now has a holistic vision of them and is ready to dig deeper.


Diego Dagum is a software developer with more than 20 years of experience. He’s currently a Visual C++ community program manager with Microsoft.

Thanks to the following technical experts for reviewing this article: David Cravey, Alon Fliess, Fabio Galuppo and Marc Gregoire