C++

Nouvelles fonctionnalités de simultanéité dans Visual C++ 11

Diego Dagum

Télécharger l'exemple de code

La dernière itération C++ (connue sous le nom de C++11) et approuvée par l'ISO (Organisation internationale de normalisation) l'an dernier, officialise un nouvel ensemble de bibliothèques et quelques mots réservés pour traiter la simultanéité. De nombreux développeurs ont utilisé la simultanéité dans C++ auparavant, mais toujours via une bibliothèque tierce, exposant souvent directement les API du système d'exploitation.

Herb Sutter avait annoncé la fin de la « gratuité des performances » en décembre 2004. En effet les fabricants de processeurs ne pouvaient plus livrer de processeurs plus rapides pour des raisons de consommation électrique physique et d'augmentation thermique. Cela a abouti à l'ère du multicœur grand public, une nouvelle réalité à laquelle le standard C++ a dû s'adapter en faisant un grand pas.

Le reste de cet article est divisé en deux sections principales et en sous-sections, plus petites. La première section principale (commençant par Exécution parallèle) traite des technologies qui permettent aux applications d'exécuter des activités indépendantes ou semi-indépendantes en parallèle. La deuxième section principale (commençant par Synchronisation de l'exécution en simultané), explore les mécanismes de synchronisation des activités gérant les données, afin d'éviter les conditions de concurrence.

Cet article est basé sur les fonctionnalités figurant dans la prochaine version de Visual C++ (appelée pour l'instant Visual C++ 11). Certaines d'entre elles sont déjà disponibles dans la version actuelle, Visual C++ 2010. Cet article ne constitue pas un guide pour créer des algorithmes parallèles, ni une documentation exhaustive à propos de toutes les options disponibles, mais il donne une solide présentation des nouvelles fonctionnalités de simultanéité de C++11.

Exécution parallèle

Lorsque vous concevez des processus et créez des algorithmes sur les données, la tendance naturelle est de les spécifier dans une suite d'étapes. Tant que les performances se trouvent dans des limites raisonnables, il s'agit du schéma le plus recommandé, car il est généralement plus facile de comprendre un besoin pour des bases de code faciles à gérer.

Lorsque les performances deviennent un facteur à risque, une première tentative classique pour résoudre la situation consiste à optimiser l'algorithme séquentiel afin de réduire les cycles de processeur consommés. Cette opération peut s'effectuer jusqu'à parvenir à un stade ou aucune optimisation supplémentaire n'est disponible, ou est difficile à obtenir. Il est alors temps de diviser les séries séquentielles d'étapes en occurrences simultanées.

Dans la première section, vous allez découvrir ce qui suit :

  • Tâches asynchrones : portions plus petites de l'algorithme original uniquement liées par les données qu'elles produisent ou consomment.
  • Threads : unités d'exécution administrées par l'environnement d’exécution. Elles sont liées aux tâches dans le sens où celles-ci s'exécutent sur des threads.
  • Threads internes : variables liées à la thread, exceptions propagées à partir des threads, etc.

Tâches asynchrones

Dans le code associé à cet article, vous trouverez un projet intitulé Cas séquentiel, comme illustré à la figure 1.

Figure 1 Code de cas séquentiel

int a, b, c;
int calculateA()
{
  return a+a*b;
}
int calculateB()
{
  return a*(a+a*(a+1));
}
int calculateC()
{
  return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
  getUserData(); // initializes a and b
  c = calculateA() * (calculateB() + calculateC());
  showResult();
}

La fonction principale demande certaines données à l'utilisateur, puis soumet ces données à trois fonctions : calculateA, calculateB et calculateC. Les résultats sont ensuite combinés afin de fournir des informations de sortie pour l'utilisateur.

Les fonctions de calcul de la documentation associée sont codées de telle façon qu'un délai aléatoire d'une à trois secondes est introduit dans chacune. Sachant que ces étapes sont exécutées de manière séquentielle, cela aboutit à un temps d'exécution global (une fois les données d'entrées saisies) de neuf secondes dans le pire scénario. Vous pouvez tester ce code en appuyant sur F5 et en exécutant l'exemple.

J'ai donc besoin de réviser la séquence d'exécution et de trouver les étapes à être effectuées simultanément. Ces fonctions étant indépendantes, je peux les exécuter en parallèle à l'aide de la fonction async :

int main(int argc, char *argv[])

{

  getUserData();

  future<int> f1 = async(calculateB), f2 = async(calculateC);

  c = (calculateA() + f1.get()) * f2.get();

  showResult();

}

J'ai présenté deux concepts ici : async et future, tous deux définis dans l'en-tête <future> et l'espace de noms std. Le premier reçoit une fonction, une expression lambda ou un objet de fonction (functor) et renvoie un futur. Vous pouvez concevoir le concept de futur comme l'espace réservé pour un résultat éventuel. Quel résultat ? Celui renvoyé par la fonction appelée de façon asynchrone.

J'aurai besoin à un moment des résultats de ces fonctions d'exécution en parallèle. Le fait d'appeler la méthode get sur chaque futur bloque l'exécution jusqu'à ce que la valeur soit disponible.

Vous pouvez tester et comparer le code révisé avec le cas séquentiel en exécutant le projet AsyncTasks dans l'exemple associé. Le pire retard de cette modification est d'environ trois secondes, contre neuf secondes pour la version séquentielle.

Il s'agit du modèle de programmation léger qui permet au développeur de ne pas à avoir à créer de threads. Vous pouvez toutefois spécifier des stratégies de threading, mais elles ne seront pas abordées ici.

Threads

Le modèle de tâche asynchrone présenté dans la section précédente peut être suffisant dans certains scénarios donnés, mais si vous avez besoin d'une gestion et d'un contrôle plus complets de l'exécution des threads, C++11 est fourni avec des classes de threads, déclarées dans l'en-tête <thread> et qui se trouvent dans l'espace de noms std.

Bien qu'il s'agisse d'un modèle de programmation plus complexe, les threads offrent de meilleures méthodes de synchronisation et de coordination, ce qui leur permet de mener une exécution vers une autre thread et d'attendre pendant une durée déterminée ou jusqu'à ce qu'une thread soit terminée avant de continuer.

Dans l'exemple suivant (disponible dans le projet Threads du code associé), j'ai une fonction lambda, qui à partir d'un argument entier, imprime ses multiples inférieurs à 100 000 sur la console :

auto multiple_finder = [](int n) {

  pour (int i = 0; i < 100000; i++)

    si (i%n==0)

      cout << i << " est un multiple de " << n << endl;

};

int main(int argc, char *argv[])

{

  thread th(multiple_finder, 23456);

  multiple_finder(34567);

  th.join();

}

Comme vous le verrez dans d'autres exemples, le fait d'avoir transmis une fonction lambda à la thread est circonstanciel. Une fonction(ou functor) suffirait également.

Dans la fonction principale, j'exécute cette fonction dans deux threads avec des paramètres différents. Regardez le résultat obtenu (il peut varier entre des exécutions différentes en raison de temporisations) :

0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456

Il est possible que j'implémente l'exemple à propos des tâches asynchrones dans la section précédente avec les threads. J'ai besoin pour cela de présenter le concept de promesse. Une promesse peut être assimilée à un récepteur via lequel un résultat sera supprimé lorsqu'il est disponible. Qu'advient-il du résultat une fois qu'il est supprimé ? Chaque promesse a un futur associé.

Le code indiqué dans la figure 2 (disponible dans le projet Promesses de l'exemple de code) associe trois threads (au lieu de tâches) aux promesses et oblige chacune d'entre elles à appeler une fonction calculate. Comparez ces détails avec la version AsyncTasks plus légère.

Figure 2 Association des futurs aux promesses

typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
  p.set_value(f());
}
int main(int argc, char *argv[])
{
  getUserData();
  promise<int> p1, p2;
  future<int> f1 = p1.get_future(), f2 = p2.get_future();
  thread t1(&func2promise, calculateB, std::ref(p1)),
    t2(&func2promise, calculateC, std::ref(p2));
  c = (calculateA() + f1.get()) * f2.get();
  t1.join(); t2.join();
  showResult();
}

Variables et exceptions liées aux threads

Dans C++, vous pouvez définir des variables globales dont l'étendue couvre toute l'application, y compris les threads. Par rapport aux threads, il existe à présent une manière de définir ces variables globales, de manière à ce que chaque thread conserve sa propre copie. Ce concept est connu sous le nom de mémoire locale de thread et est déclaré comme suit :

thread_local int subtotal = 0;

Si la déclaration est effectuée dans l’étendue d'une fonction, la visibilité de la variable sera réduite à cette fonction, mais chaque thread conservera sa propre copie statique. C’est à dire que les valeurs de la variable par thread sont conservées entre les appels de fonction.

Même si thread_local n'est pas disponible dans Visual C++ 11, une simulation est possible avec une extension Microsoft non-standard:

#define  thread_local __declspec(thread)

Que se passerait-il si une exception était lancée à l'intérieur d'une thread ? Il peut arriver que l'exception puisse être interceptée et gérée dans la pile d'appel à l'intérieur de la thread. Mais si la thread ne gère pas l'exception, vous devrez transporter cette dernière vers la thread déclencheur. C++11 contient ces mécanismes.

Dans la figure 3 (disponible dans le code associé au projet ThreadInternals), il existe une fonction sum_until_element_with_threshold, qui traverse un vecteur jusqu'à trouver un élément spécifique, en additionnant les éléments tout du long. Si la somme dépasse un seuil, une exception est générée.

Figure 3 Mémoire locale de thread et exceptions de thread

thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
  unsigned threshold, exception_ptr& pExc)
{
  try{
    find_if_not(begin(v), end(v), [=](const unsigned i) -> bool {
      bool ret = (i!=element);
      sum_total+= i;
      if (sum_total>threshold)
        throw runtime_error("Sum exceeded threshold.");
      return ret;
    });
    cout << "(Thread #" << this_thread::get_id() << ") " <<
      "Sum of elements until " << element << " is found: " << sum_total << endl;
  } catch (...) {
    pExc = current_exception();
  }
}

Le cas échéant, l'exception est capturée via current_exception dans exception_ptr.

La fonction principale déclenche une thread sur sum_until_element_with_threshold, tout en appelant cette même fonction avec un paramètre différent. Une fois les deux appels terminés (celui de la thread principale et celui à partir duquel la thread est déclenchée), leur exception_ptrs respective est analysée :

const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  exception_ptr pExc1, pExc2;
  scramble_vector(1000);
  thread th(sum_until_element_with_threshold, 0, THRESHOLD, ref(pExc1));
  sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
  th.join();
  dealWithExceptionIfAny(pExc1);
  dealWithExceptionIfAny(pExc2);
}

Si l'une de ces exception_ptrs est déjà initialisée (signe qu'une exception s'est produite), leurs exceptions sont redéclenchées avec rethrow_exception:

void dealWithExceptionIfAny(exception_ptr pExc)
{
  try
  {
    if (!(pExc==exception_ptr()))
      rethrow_exception(pExc);
    } catch (const exception& exc) {
      cout << "(Main thread) Exception received from thread: " <<
        exc.what() << endl;
  }
}

Il s'agit du résultat de votre exécution, car la somme de la deuxième thread a dépassé son seuil :

(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread: Sum exceeded threshold.

Synchronisation de l'exécution en simultané

Il serait souhaitable que toutes les applications puissent être divisées en ensembles complètement indépendants de tâches asynchrones. En pratique, cela n'est presque jamais possible. En effet, il y a au moins des dépendances sur les données que toutes les parties gèrent simultanément. Cette section présente les nouvelles technologies C++11 permettant d'éviter les conditions de concurrence.

Vous allez découvrir :

  • Les types atomiques : similaires aux types de données primitives, mais permettant des modifications sécurisées au niveau de threads.
  • Mutex et verrous : éléments qui nous permettent de définir des zones critiques sécurisées au niveau de threads.
  • Variables conditionnelles : façon d'empêcher l'exécution des threads jusqu'à ce que certains critères soient remplis.

Types atomiques

L'en-tête <atomic> présente une série de types primitifs (atomic_char, atomic_int, etc.) implémentés en termes d'opérations imbriquées. Ainsi, ces types sont équivalents à leurs homonymes sans atomic_ prefix, à ceci près que leurs opérateurs d'assignation (==, ++, --, +=, *= etc.) sont protégés contre les conditions de concurrence. Il n'est donc pas possible qu'au milieu d'une affectation à ces types de données, une autre thread intervienne et modifie les valeurs avant que nous ayons terminé.

Dans l'exemple suivant, il y a deux threads parallèles (l'une étant la principale) à la recherche de différents éléments au sein du même vecteur :

atomic_uint total_iterations;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  total_iterations = 0;
  scramble_vector(1000);
  thread th(find_element, 0);
  find_element(100);
  th.join();
  cout << total_iterations << " total iterations." << endl;
 }

Lorsque tous les éléments sont trouvés, un message provenant de la thread est imprimé, indiquant la position du vecteur (ou l'itération) où ils ont été trouvés :

void find_element(unsigned element)
{
  unsigned iterations = 0;
  find_if(begin(v), end(v), [=, &iterations](const unsigned i) -> bool {
    ++iterations;
    return (i==element);
  });
  total_iterations+= iterations;
  cout << "Thread #" << this_thread::get_id() << ": found after " <<
    iterations << " iterations." << endl;
}

Il existe également une variable courante, total_iterations, qui est mise à jour avec le nombre composé d'itérations appliquées par les deux threads. total_iterations doit donc être atomique pour ne pas que les deux threads soient mises à jour en même temps. Dans l'exemple précédent, même si vous n'avez pas besoin d'imprimer une partie du nombre d'itérations dans find_element, vous devez quand même accumuler des itérations dans cette variable locale au lieu de total_iterations, pour éviter une contention sur la variable atomique.

Vous trouverez l'exemple précédent dans le projet Atomics du code associé téléchargé. Je l'ai exécuté et obtenu ce qui suit :

Thread #8064: found after 168 iterations.
Thread #6948: found after 395 iterations.
563 total iterations.

Exclusion mutuelle et verrous

La section précédente illustre un cas particulier d'exclusion mutuelle pour la rédaction de l'accès sur les types primitifs. L'en-tête <mutex> définit une série de classes verrouillables pour définir des zones critiques. De cette façon, vous pouvez définir un mutex pour établir une zone critique dans toute une série de fonctions ou de méthodes, afin qu'une seule thread à la fois puisse accéder à n'importe quel membre de cette série en parvenant à verrouiller son mutex.

Une thread tentant de verrouiller un mutex peut soit rester bloquée jusqu'à ce que ce mutex soit disponible, soit échouer dans cette tentative. Entre ces deux extrêmes, la classe alternative timed_mutex class peut rester bloquée pendant un petit intervalle de temps avant d'échouer. Le fait de laisser les tentatives de blocage abandonner permet d'empêcher les blocages.

Un mutex verrouillé doit être explicitement déverrouillé pour que d'autres puissent le verrouiller. Si cette opération n'est pas effectuée, cela peut entraîner un comportement d'application indéterminé, ce qui peut être source d'erreurs, comme oublier de libérer de la mémoire dynamique. Le fait d'oublier de libérer un verrou est plus ennuyeux. En effet, l'application risque de ne plus fonctionner correctement si un autre code reste attendre sur ce verrou. Heureusement, C++11 est également fourni avec des classes de verrouillage. Un verrou agit comme un mutex, mais son destructeur s'assure de le libérer s'il est verrouillé.

Le code suivant (disponible dans le projet Mutex dans le code téléchargé) définit une zone critique autour d'un mutex mx :

mutex mx;
void funcA();
void funcB();
int main()
{
  thread th(funcA)
  funcB();
  th.join();
}

Ce mutex est utilisé pour garantir que deux functions (funcA et funcB) puissent s'exécuter en parallèle sans passer ensemble dans la zone critique.

La fonction funcA patiente, si nécessaire, pour passer dans la zone critique. Pour ce faire, vous avez simplement besoin du mécanisme de verrouillage le plus simple lock_guard :

void funcA()
{
  for (int i = 0; i<3; ++i)
  {
    this_thread::sleep_for(chrono::seconds(1));
    cout << this_thread::get_id() << ": locking with wait... " << endl;
    lock_guard<mutex> lg(mx);
    ... // Do something in the critical region.
    cout << this_thread::get_id() << ": releasing lock." << endl;
  }
}

Ce mécanisme est défini de telle façon que funcA doit accéder trois fois à la zone critique. La fonction funcB, en revanche, va essayer de verrouiller, mais si le mutex est déjà verrouillé, funcB attend simplement pendant une seconde avant d'essayer à nouveau d'obtenir l'accès à la zone critique. Le mécanisme utilisé est unique_lock avec la stratégie try_to_lock_t, comme indiqué à la figure 4.

Figure 4 Verrouillage avec attente

void funcB()
{
  int successful_attempts = 0;
  for (int i = 0; i<5; ++i)
  {
    unique_lock<mutex> ul(mx, try_to_lock_t());
    if (ul)
    {
      ++successful_attempts;
      cout << this_thread::get_id() << ": lock attempt successful." <<
        endl;
      ... // Do something in the critical region
      cout << this_thread::get_id() << ": releasing lock." << endl;
    } else {
      cout << this_thread::get_id() <<
        ": lock attempt unsuccessful. Hibernating..." << endl;
      this_thread::sleep_for(chrono::seconds(1));
    }
  }
  cout << this_thread::get_id() << ": " << successful_attempts
    << " successful attempts." << endl;
}

Ce mécanisme est défini de telle façon que funcB tente d'accéder cinq fois au maximum à la zone critique. La figure 5 illustre le résultat de l'exécution. En dehors des cinq tentatives, funcB peut uniquement accéder à la zone critique deux fois.

Figure 5 Exécution de l'exemple de projet Mutex

funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: 2 successful attempts.
funcA: locking with wait ...
funcA: lock secured ...
funcA: releasing lock.

Variables conditionnelles

L'en-tête <condition_variable> est fourni avec la dernière structure abordée dans cet article, indispensable pour les cas où la coordination entre les threads est liée aux événements.

Dans l'exemple suivant (disponible dans le projet CondVar dans le code téléchargé) une fonction producer place les éléments dans une file d'attente :

mutex mq;
condition_variable cv;
queue<int> q;
void producer()
{
  for (int i = 0;i<3;++i)
  {
    ... // Produce element
    cout << "Producer: element " << i << " queued." << endl;
    mq.lock();      q.push(i);  mq.unlock();
    cv.notify_all();
  }
}

La file d'attente standard n'est pas sécurisée au niveau des threads, vous devez donc vous assurer que personne d'autre ne l'utilise (que le consommateur n'affiche aucun élément).

La fonction consommateur tente d'extraire des éléments de la file d'attente lorsqu'ils sont disponibles, sinon elle attend simplement la variable conditionnelle avant d'essayer à nouveau. Après deux échecs consécutifs, cette fonction se termine (voir la figure 6).

Figure 6 Réveil de threads via des variables conditionnelles

void consumer()
{
  unique_lock<mutex> l(m);
  int failed_attempts = 0;
  while (true)
  {
    mq.lock();
    if (q.size())
    {
      int elem = q.front();
      q.pop();
      mq.unlock();
      failed_attempts = 0;
      cout << "Consumer: fetching " << elem << " from queue." << endl;
      ... // Consume elem
    } else {
      mq.unlock();
      if (++failed_attempts>1)
      {
        cout << "Consumer: too many failed attempts -> Exiting." << endl;
        break;
      } else {
        cout << "Consumer: queue not ready -> going to sleep." << endl;
        cv.wait_for(l, chrono::seconds(5));
      }
    }
  }
}

Le consommateur doit être réveillé via notify_all par le producteur à chaque fois qu'un nouvel élément est disponible. De cette façon, le producteur évite de laisser le consommateur en veille pendant tout l'intervalle si des éléments sont prêts.

La figure 7 illustre le résultat de mon exécution.

Figure 7 Synchronisation avec des variables conditionnelles

Consumer: queue not ready -> going to sleep.
Producer: element 0 queued.
Consumer: fetching 0 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 1 queued.
Consumer: fetching 1 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 2 queued.
Producer: element 3 queued.
Consumer: fetching 2 from queue.
Producer: element 4 queued.
Consumer: fetching 3 from queue.
Consumer: fetching 4 from queue.
Consumer: queue not ready -> going to sleep.
Consumer: two consecutive failed attempts -> Exiting.

Une vue holistique

Pour résumer, cet article a présenté un panorama conceptuel des mécanismes introduits dans C++11 afin d'autoriser l'exécution parallèle dans une ère où les environnements multiprocesseurs sont monnaie courante.

Les tâches asynchrones permettent à un modèle de programmation léger de paralléliser l'exécution. Le résultat de chaque tâche peut être récupéré via un futur associé.

Les threads offrent plus de granularité que les tâches (même si elles sont plus lourdes), avec des mécanismes pour conserver des copies distinctes des variables statiques et transporter des exceptions entre les threads.

Comme les threads parallèles agissent sur des données communes, C++11 fournit des ressources pour éviter les conditions de concurrence. Les types atomiques offrent une manière sûre de garantir que les données sont modifiées par une seule thread à la fois.

Les mutex nous aident à définir des zones critiques dans l'ensemble du code (zones dans lesquelles les threads ont interdiction d'accéder simultanément). Les verrous encapsulent les mutex, en liant le déverrouillage du dernier au cycle de vie du premier.

Enfin, les variables conditionnelles optimisent l'efficacité de la synchronisation des threads. En effet, certaines threads peuvent attendre des événements notifiés par d'autres threads.

Cet article n'a pas abordé toutes les manières de configurer et d'utiliser chacune de ces fonctionnalités, mais le lecteur en a désormais une vision holistique et est prêt à aller plus loin.

Diego Dagum est développeur de logiciels depuis plus de 20 ans. Il est actuellement responsable du programme communautaire Visual C++ chez Microsoft.

Je remercie les experts techniques suivants d'avoir relu cet article : David Cravey, Alon Fliess, Fabio Galuppo et Marc Gregoire