C++

Visual C++ 11 の新しい同時実行機能

Diego Dagum

コード サンプルのダウンロード

最新の C++ イテレーションは C++11 として知られ、この 1 年の間に国際標準化機構 (ISO) によって承認されました。C++ 11 は、ライブラリの新しいセットといくつかの予約語を定式化して、同時実行に対処しています。以前も多くの開発者が C++ で同時実行を扱っていましたが、常にサードパーティのライブラリを利用していました。こうしたサード パーティのライブラリの多くは、OS の API を直接公開しています。

Herb Sutter は 2004 年 12 月に、CPU の各メーカーが物理的な電力消費量や熱量の増加を理由に、高速 CPU の出荷をやめたことから、「無料でパフォーマンスを得られる」時代は終わった」と述べました。この時点から、現在主流であるマルチコアの時代、つまり、C++ が標準として適応するために重大な飛躍を求められる新たな時代への移行が始まりました。

今回の記事は、2 つのメイン セクションといくつかの短いサブセクションで構成しています。最初のメイン セクションの「並列実行」では、アプリケーションが独立アクティビティまたは準独立アクティビティを並列に実行できるようにする技術を説明します。2 番目のメイン セクションの「同時実行を同期する」では、こうしたアクティビティがデータを処理する方法を同期して、競合状態を避けるしくみを詳しく説明します。

この記事は、次期バージョンの Visual C++ (現在のところは Visual C++ 11) に含まれる機能に基づいて構成しています。この機能の一部は、現バージョンの Visual C++ 2010 でも使用できます。この記事は並列アルゴリズムをモデル化する手引書でも、使用可能なすべてのオプションに関する包括的な資料でもありませんが、新しい C++11 の同時実行機能の純粋な入門書として使用できます。

並列実行

データを扱うプロセスをモデル化し、アルゴリズムを設計する際は、通常、プロセスやアルゴリズムを複数のステップのシーケンスとして指定します。パフォーマンスが容認可能な範囲に収まれば、通常、簡単に理解できるため、最も推奨できるスキーマで、コード ベースをメインテナンス可能にするにはこうした考え方が必要です。

パフォーマンスが問題になるときは、その状況を克服する最初の手段として、このシーケンシャル アルゴリズムを最適化し、使用する CPU サイクル数を減らす手法を取ります。この手法は、これ以上の最適化を行えない状況、または最適化が困難になる状況に達するまで実行します。こうした状況に達したら、一連のシーケンシャル ステップを、同時に実行できるアクティビティに分割する作業に着手することになります。

ここでは、次のことを習得します。

  • 非同期タスク: 最初のアルゴリズムの小さな部分を、各部が生成または使用するデータだけでリンクします。
  • スレッド: ランタイム環境が管理する実行単位です。タスクはスレッドで実行されるという点で、スレッドとタスクは関連します。
  • スレッドの内部構造: スレッドに限定される変数、スレッドから伝達される例外などです。

非同期タスク

この記事の付属コードに Sequential Case というプロジェクトがあります (図 1 参照)。

図 1 Sequential Case コード

int a, b, c;
int calculateA()
{
  return a+a*b;
}
int calculateB()
{
  return a*(a+a*(a+1));
}
int calculateC()
{
  return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
  getUserData(); // initializes a and b
  c = calculateA() * (calculateB() + calculateC());
  showResult();
}

main 関数ではユーザーにデータの入力を求め、calculateA、calculateB、および calculateC という 3 つの関数にそのデータを渡します。結果を後でまとめ、ユーザーに出力情報を表示します。

付属資料の計算関数では、1 ~ 3 秒のランダムな遅延を各関数に導入するという方法でコーディングしています。これらのステップがシーケンシャルに実行されるとすると、最悪の場合、実行時間はデータが入力されてから合計 9 秒になります。F5 を押してサンプルを実行して、このコードを試してみることができます。

そこで、実行シーケンスを修正し、同時に実行できるステップを見つけます。各関数は独立しているため、次のように async 関数を使用して並列に実行できます。

int main(int argc, char *argv[])

{

  getUserData();

  future<int> f1 = async(calculateB), f2 = async(calculateC);

  c = (calculateA() + f1.get()) * f2.get();

  showResult();

}

ここでは async と future という 2 つの概念を取り入れました。どちらも <future> ヘッダーで定義し、std 名前空間に含まれています。async は、関数、ラムダ、または関数オブジェクト (ファンクタ) を受け取り、future を返します。future は、最終結果のプレースホルダーと考えることができます。つまり、async という関数が返す結果のプレースホルダーです。

どこかの時点で、並列実行関数の結果が必要になります。各 future の get メソッドを呼び出すと、値が取得可能になるまで実行がブロックされます。

付属のサンプルで AsyncTasks プロジェクトを実行し、修正済みコードをテストして、シーケンシャルに実行した場合と比較できます。最悪の場合の実行時間は、シーケンシャル バージョンでは 9 秒でしたが、修正済みコードでは約 3 秒です。

この軽量プログラミング モデルにより、開発者はスレッドを作成する必要がなくなります。もちろんスレッドのポリシーを指定することもできますが、ここでは触れません。

スレッド

前のセクションで示した非同期タスク モデルでも十分な状況もありますが、スレッドの実行をさらに細かく処理して制御する必要がある場合は、C++11 の thread クラスを使用します。このクラスは、<thread> ヘッダーで宣言され、std 名前空間に含まれています。

プログラミング モデルは複雑になりますが、スレッドにより同期と調整に対応する優れたメソッドが提供されるため、実行を別のスレッドに移して、指定時間待機するか、別のスレッドが完了してから実行を継続できます。

次の例 (付属コードの Threads プロジェクト) では、整数の引数を渡すと、その数の倍数で 100,000 未満の値をコンソールに出力するラムダ関数を使用しています。

auto multiple_finder = [](int n) {

  for (int i = 0; i < 100000; i++)

    if (i%n==0)

      cout << i << " is a multiple of " << n << endl;

};

int main(int argc, char *argv[])

{

  thread th(multiple_finder, 23456);

  multiple_finder(34567);

  th.join();

}

後の例からもわかるように、ラムダ関数をスレッドに渡しているのはこの状況に応じたためで、関数やファンクタを渡してもかまいません。

main 関数では、この関数に異なるパラメーターを渡して 2 つのスレッドで実行します。次の結果を見てください (実行タイミングが異なれば、結果も変わる可能性があります)。

0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456

前のセクションの非同期タスクの例は、スレッドを使用して実装することもできます。このためには、promise の考え方を導入する必要があります。promise は、使用可能になったときに結果をドロップするシンクと考えることができます。ドロップすると、その結果はどこに出力されるでしょう。各 promise には関連付けられた future があります。

図 2 に示すコード (サンプル コードの Promises プロジェクト) では、3 つのスレッド (タスクではない) を promise に関連付け、各スレッドから計算関数を呼び出させます。この内容を、先ほどの軽量 AsyncTasks バージョンと比べてみてください。

図 2 Future と Promise を関連付ける

typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
  p.set_value(f());
}
int main(int argc, char *argv[])
{
  getUserData();
  promise<int> p1, p2;
  future<int> f1 = p1.get_future(), f2 = p2.get_future();
  thread t1(&func2promise, calculateB, std::ref(p1)),
    t2(&func2promise, calculateC, std::ref(p2));
  c = (calculateA() + f1.get()) * f2.get();
  t1.join(); t2.join();
  showResult();
}

スレッド限定の変数と例外

C++ では、スレッドを含め、アプリケーション全体にスコープが限定されるグローバル変数を定義できます。ただし、スレッドに関しては、このようなグローバル変数を、すべてのスレッドが独自のコピーを保持するように定義できるようになりました。この考え方は、スレッド ローカル ストレージとして知られ、次のように宣言します。

thread_local int subtotal = 0;

関数のスコープ内で宣言を行うと、その変数へのアクセスはその関数に限定されますが、各スレッドは独自の静的コピーを保持し続けます。つまり、関数が複数回呼び出される場合、スレッド単位に変数の値が保持されます。

Visual C++ 11 では thread_local を使用できませんが、非標準の Microsoft 拡張機能でシミュレーションを行うことができます。

#define  thread_local __declspec(thread)

スレッド内で例外がスローされるとどうなるでしょう。おそらく、スレッド内の呼び出しスタックで例外がキャッチされ、ハンドルされます。しかし、スレッドが例外をハンドルしない場合は、その例外を呼び出し側のスレッドに送る方法が必要です。C++11 にはこれを行うメカニズムが備わっています。

図 3 (付属コードの ThreadInternals プロジェクト) の関数 sum_until_element_with_threshold では、特定の要素が見つかるまでベクトルを調べ、見つかるまでのすべての要素を合計します。合計がしきい値を超えると、例外がスローされます。

図 3 スレッド ローカル ストレージとスレッドの例外

thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
  unsigned threshold, exception_ptr& pExc)
{
  try{
    find_if_not(begin(v), end(v), [=](const unsigned i) -> bool {
      bool ret = (i!=element);
      sum_total+= i;
      if (sum_total>threshold)
        throw runtime_error("Sum exceeded threshold.");
      return ret;
    });
    cout << "(Thread #" << this_thread::get_id() << ") " <<
      "Sum of elements until " << element << " is found: " << sum_total << endl;
  } catch (...) {
    pExc = current_exception();
  }
}

例外が発生すると、current_exception から exception_ptr に例外がキャプチャされます。

main 関数では、sum_until_element_with_threshold をスレッド指定で呼び出すと同時に、別のパラメーターでも呼び出しています。(main スレッドで実行された関数と、スレッドを指定して実行した関数の) 両方の呼び出しが完了すると、それぞれの exception_ptrs を分析します。

const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  exception_ptr pExc1, pExc2;
  scramble_vector(1000);
  thread th(sum_until_element_with_threshold, 0, THRESHOLD, ref(pExc1));
  sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
  th.join();
  dealWithExceptionIfAny(pExc1);
  dealWithExceptionIfAny(pExc2);
}

これらの exception_ptrs のいずれか初期化されると (これはなんらかの例外が生じたサインです)、それらの例外を rethrow_exception を使ってトリガーします。

void dealWithExceptionIfAny(exception_ptr pExc)
{
  try
  {
    if (!(pExc==exception_ptr()))
      rethrow_exception(pExc);
    } catch (const exception& exc) {
      cout << "(Main thread) Exception received from thread: " <<
        exc.what() << endl;
  }
}

以下は、2 番目のスレッドでの合計がしきい値を超えたときの実行結果です。

(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread: Sum exceeded threshold.

同時実行を同期する

すべてのアプリケーションを 100 パーセント独立した非同期タスクのセットに分割できれば理想的です。しかし実際には、すべての関係タスクが同時に処理するデータには少なくとも依存関係が生じるため、これはほぼ不可能です。ここでは、競合状態を回避する新しい C++11 のテクノロジを紹介します。

ここでは、次のことを習得します。

  • アトミック型: プリミティブ データ型に似ていますが、スレッドセーフで変更が可能です。
  • ミューテックスとロック: スレッド セーフのクリティカル領域を定義できるようにする要素です。
  • 条件変数: いくつか基準が満たされるまでスレッドの実行をフリーズする手段を提供します。

アトミック型

<atomic> ヘッダーは、インターロック操作で実装する、atomic_char、atomic_int などの一連のプリミティブ型を導入します。これらの型は atomic_ というプレフィクスが付かない型と同じですが、すべての代入演算子 (==、++、--、+=、*= など) が競合状態から保護される点が異なります。つまり、これらのデータ型への代入中に別のスレッドが割り込んで、代入を完了する前に値が変更されることはありません。

次の例では、2 つの並列スレッド (1 つは main) が同じベクトル内で異なる要素を探しています。

atomic_uint total_iterations;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  total_iterations = 0;
  scramble_vector(1000);
  thread th(find_element, 0);
  find_element(100);
  th.join();
  cout << total_iterations << " total iterations." << endl;
 }

各要素が見つかると、スレッドからのメッセージを出力し、その要素が見つかったベクトル (イテレーション) 内の位置を示します。

void find_element(unsigned element)
{
  unsigned iterations = 0;
  find_if(begin(v), end(v), [=, &iterations](const unsigned i) -> bool {
    ++iterations;
    return (i==element);
  });
  total_iterations+= iterations;
  cout << "Thread #" << this_thread::get_id() << ": found after " <<
    iterations << " iterations." << endl;
}

共通の変数 total_iterations もあります。この変数は、両方のスレッドによって適用されるイテレーション数の複合数を更新します。このため、total_iterations は、両方のスレッドがこの変数を同時に更新しないようにアトミックでなければなりません。上記の例では、find_element にあるイテレーションの部分的な数を出力する必要がない場合でも、アトミック変数に競合が生じないように、total_iterations ではなく、ローカル変数にイテレーション数を累積しています。

このサンプルは、付属のダウンロードコードの Atomics プロジェクトにあります。これを実行すると、以下のようになります。

 

Thread #8064: found after 168 iterations.
Thread #6948: found after 395 iterations.
563 total iterations.

ミューテックス (相互排他) とロック

前のセクションでは、プリミティブ型へのアクセスを行う場合に相互に排他的な特殊なケースを説明しました。<mutex> ヘッダーは、一連のロック可能なクラスを定義することで、クリティカル領域を定義します。このようにして、一連の関数やメソッド全体を含むクリティカル領域を構築するようにミューテックスを定義できます。このミューテックスを適切にロックすることで、ミューテックス内の一連の関数やメソッドのどのメンバーにも同時に 1 つのスレッドしかアクセスできないようになります。

ミューテックスのロックを試みるスレッドは、ミューテックスが使用可能になるまでブロックすることも、単に試行に失敗するようにもできます。これら 2 つの両極端な方法の中間として、代替の timed_mutex クラスを使用して短期間ブロックしてから失敗するようにすることも可能です。ロックの試行を取りやめることができるようにすることで、デッドロックを回避できます。

ロックしたミューテックスは、他のスレッドがそのミューテックスをロックできるように、明示的にロックを解除できる必要があります。これができなければ、アプリケーションの動作が不確定になる可能性があり、動的メモリの解放を忘れた場合と同様にエラーにつながることがあります。実際には、ロックの解除を忘れる方がより悪い状況を引き起こします。他のコードがそのロックの解除を待ち続けた場合、それ以降アプリケーションが正常に機能しなくなる可能性があります。さいわい、C++11 にはロック関連のクラスも用意されています。ロックはミューテックスで機能しますが、ロックされている場合はデストラクターで必ず解除されます。

次のコード (コード ダウンロードの Mutex プロジェクト) は、ミューテックス mx の周囲にクリティカル領域を定義します。

mutex mx;
void funcA();
void funcB();
int main()
{
  thread th(funcA)
  funcB();
  th.join();
}

このミューテックスを使用することで、funcA と funcB という 2 つの関数がこのクリティカル領域に同時に入ることなく、確実に並列実行されるようになります。

関数 funcA は、必要に応じてクリティカル領域に入るのを待機します。このように設定するには、次のように最も簡単なロック メカニズムとして lock_guard を使用するだけです。

void funcA()
{
  for (int i = 0; i<3; ++i)
  {
    this_thread::sleep_for(chrono::seconds(1));
    cout << this_thread::get_id() << ": locking with wait... " << endl;
    lock_guard<mutex> lg(mx);
    ... // Do something in the critical region.
    cout << this_thread::get_id() << ": releasing lock." << endl;
  }
}

この定義方法では、funcA はクリティカル領域に 3 回アクセスします。関数 funcB はロックを試みますが、ミューテックスが既にロックされている場合、funcB は短時間待機するだけで、クリティカル領域へのアクセスを再度試みます。ここで使用するメカニズムが、ポリシー try_to_lock_t を指定した unique_lock です (図 4 参照)。

図 4 待機を伴うロック

void funcB()
{
  int successful_attempts = 0;
  for (int i = 0; i<5; ++i)
  {
    unique_lock<mutex> ul(mx, try_to_lock_t());
    if (ul)
    {
      ++successful_attempts;
      cout << this_thread::get_id() << ": lock attempt successful." <<
        endl;
      ... // Do something in the critical region
      cout << this_thread::get_id() << ": releasing lock." << endl;
    } else {
      cout << this_thread::get_id() <<
        ": lock attempt unsuccessful. Hibernating..." << endl;
      this_thread::sleep_for(chrono::seconds(1));
    }
  }
  cout << this_thread::get_id() << ": " << successful_attempts
    << " successful attempts." << endl;
}

この定義方法では、funcB はクリティカル領域へのアクセスを最大 5 回まで試行します。図 5 はこの実行結果を示しています。5 回の試行中、funcB がクリティカル領域にアクセスできたのは 2 回だけです。

図 5 サンプル プロジェクト Mutex の実行結果

funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful. Hibernating ...
funcB: lock attempt unsuccessful. Hibernating ...
funcA: releasing lock.
funcB: 2 successful attempts.
funcA: locking with wait ...
funcA: lock secured ...
funcA: releasing lock.

条件変数

今回説明する最後の機能にヘッダー <condition_variable> があります。これは、スレッド間の調整をイベントに結び付ける場合に不可欠です。

次の例 (コード ダウンロードの CondVar プロジェクト) では、producer 関数が要素をキューにプッシュします。

mutex mq;
condition_variable cv;
queue<int> q;
void producer()
{
  for (int i = 0;i<3;++i)
  {
    ... // Produce element
    cout << "Producer: element " << i << " queued." << endl;
    mq.lock();      q.push(i);  mq.unlock();
    cv.notify_all();
  }
}

標準のキューはスレッドセーフではないため、キューに登録するときは、他のユーザーがキューを使用しない (つまり、consumer がどの要素もポップしない) ようにする必要があります。

consumer 関数は、キューを使用できるときにキューから要素のフェッチを試みるか、条件変数に基づいてしばらく待機してから再度試行します。2 回続けて試行に失敗すると終了します (図 6 参照)。

図 6 条件変数によるスレッドの再開

void consumer()
{
  unique_lock<mutex> l(m);
  int failed_attempts = 0;
  while (true)
  {
    mq.lock();
    if (q.size())
    {
      int elem = q.front();
      q.pop();
      mq.unlock();
      failed_attempts = 0;
      cout << "Consumer: fetching " << elem << " from queue." << endl;
      ... // Consume elem
    } else {
      mq.unlock();
      if (++failed_attempts>1)
      {
        cout << "Consumer: too many failed attempts -> Exiting." << endl;
        break;
      } else {
        cout << "Consumer: queue not ready -> going to sleep." << endl;
        cv.wait_for(l, chrono::seconds(5));
      }
    }
  }
}

consumer は、新しい要素が使用可能になると毎回、producer にから notify_all を使用して再開されます。その結果、producer は要素の準備が整った場合に、consumer がインターバルの間ずっとスリープ状態にならないようにします。

図 7 はこの実行結果を示します。

図 7 条件変数による同期

Consumer: queue not ready -> going to sleep.
Producer: element 0 queued.
Consumer: fetching 0 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 1 queued.
Consumer: fetching 1 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 2 queued.
Producer: element 3 queued.
Consumer: fetching 2 from queue.
Producer: element 4 queued.
Consumer: fetching 3 from queue.
Consumer: fetching 4 from queue.
Consumer: queue not ready -> going to sleep.
Consumer: two consecutive failed attempts -> Exiting.

まとめ

要約すると、今回はマルチコア環境が主流になる時代に即して並列実行を行えるように C++11 で導入されたメカニズムの考え方を示しました。

非同期タスクにより、簡単なプログラミング モデルで並列実行が可能になります。各タスクの結果は、関連付けられた future によって取得できます。

タスクよりスレッドの方が負荷が高くなりますが、静的変数のコピーを独自に保持して、スレッド間で例外を転送するメカニズムを使用することで、スレッドはタスクより細かく制御できます。

並列スレッドは共通のデータで機能するため、C++11 では競合状態を回避するためのリソースを提供します。アトミック型により、1 つのスレッドしか、データを同時に変更できないようにする信頼性の高い方法を使用できるようになります。

ミューテックスは、スレッドからの同時アクセスを防ぐクリティカル領域をコード全体にわたって定義するのに役立ちます。ロックはミューテックスをラップし、後者のロック解除を前者のライフサイクルに結びつけます。

最後に、条件変数により、他のスレッドが通知するまでイベントを待機できるため、スレッドの同期を効率良く行うことができます。

今回の記事でこれらの各機能を構成および使用するための多くの方法をすべて説明したわけではありませんが、紹介した機能の概要を踏まえ、より深く理解するための準備を整えることができればさいわいです。

Diego Dagum は、20 年を超える経験を持つソフトウェア開発者です。現在は Microsoft の Visual C++ コミュニティ プログラム マネージャーです。

この記事のレビューに協力してくれた技術スタッフの David CraveyAlon FliessFabio Galuppo、および Marc Gregoire に心より感謝いたします。