NET の問題

ThreadPool へのラウンドロビン アクセス

Stephen Toub

 

Q 現在、Microsoft .NET Framework ThreadPool を使用しており、解決方法のわからない状況に陥りました。まず、キューに登録された作業項目の大きなバッチから開始し、最初のバッチの処理が開始した後に 2 番目の作業項目 (より小さいバッチ) が到着します。最初は、大きいバッチ内の作業のいくつかが ThreadPool 内のすべてのワーカー スレッドにディスパッチされます。しかし、最初に到着したという理由で最初のバッチにすべての処理能力が向けられるのではなく、2 番目のバッチが到着したときに各バッチが均等に処理されるようにして、公平な分散が行われることを望んでいます。

 

バッチの 1 つが終了したら、引き続き処理が必要なバッチがすべてのワーカー スレッドで処理されることを望んでいます。このようなバッチ処理機能を ThreadPool の上に階層化するためにできることはありますか。

A 過去のコラムで、既存の .NET ThreadPool の上にさまざまな種類の機能を階層化する方法を示しました。MSDN Magazine の 2004 年 10 月号では、キューに登録された作業項目を待機するためのサポートを ThreadPool に追加する方法を示しました (「ThreadPoolWait と HandleLeakTracker」を参照)。2004 年 11 月号では、作業項目の優先順位のサポートを追加する方法を示しました (「ThreadPoolPriority と MethodImplAttribute」を参照)。また、2006 年 3 月号では、キャンセル サポートの追加方法を示しました (「中止可能なスレッド プール」を参照)。将来、2009 年 1 月号を見返せば、私の示した ThreadPool の上にラウンドロビン スケジューリング サポートを追加する方法を確認できるでしょう。

質問者が解決したいと思っている問題では、まず、ThreadPool が作業をディスパッチする方法を理解する必要があります。内部的には、キュー登録された作業項目のキューが管理されます。プール内のスレッドが作業を実行可能になると、そのスレッドは作業キューに戻って次の項目を取得します。この処理が行われる順序は文書化されておらず、この順序に依存してはならないことは明らかです (将来のバージョンで変更される可能性があるため)。

現在は、非常に単純な方法で実装されています。つまり、先入れ先出し (FIFO) キューです。このため、キューに登録する最初の作業が、スレッドによって最初に取り出される作業になります。質問者のバッチ処理シナリオでは、最初のバッチのすべての作業が、2 番目のバッチのすべての作業よりも前にキューに登録されます。したがって、最初のバッチのすべての作業がディスパッチされてから、2 番目のバッチの作業がディスパッチされます。一部のシナリオでは、この動作が正常で最適です。質問者のシナリオでは、より詳細な制御が必要です。

ThreadPool から制御を取得する最も簡単な方法の 1 つは、ユーザーが実際に実行を望んでいるデリゲートを独自のデリゲートで置換することです。例として、キュー登録された作業からスローされる処理不能例外をすべてキャッチし、それぞれに対してイベントを発生させる必要がある場合を考えてみましょう。そのためには、図 1 に示すようなコードを作成する必要があります。次に、ThreadPool.QueueUserWorkItem を使用する代わりに、ExceptionThreadPool.QueueUserWorkItem を使用します。依然として作業は ThreadPool によって実行されますが、プールのスレッドが実際に実行するのは、ユーザーが提供したデリゲートではなくキューに登録されたデリゲートです。デリゲートを起動すると、ユーザーが提供したデリゲートが起動され、例外がキャッチされ、ターゲット イベントが発生します。

図 1 ThreadPool の shim の作成

public static class ExceptionThreadPool {
  public static void QueueUserWorkItem(
      WaitCallback callback, object state) {
    ThreadPool.QueueUserWorkItem(delegate {
      try { callback(state); }
      catch (Exception exc) {
        var handler = UnhandledException;
        if (handler != null) 
          handler(null, 
            new UnhandledExceptionEventArgs(exc, false));
      }
    });
  }

  public static event 
    UnhandledExceptionEventHandler UnhandledException;
}

この手法は強力ですが、コストがかかることに注意してください。追加のデリゲートの割り当てや追加のデリゲートの呼び出しなどが必要になります。そのコストが高すぎるかどうかを判断できるのは読者とそのシナリオだけですが、この種の階層化は、通常は独自のスレッド プールをゼロから作成するよりもコスト効率が高くなります。

もちろんこれは非常に単純な例ですが、より複雑な作業を行うこともできます。前に述べた優先順位プールの例では、ユーザーが提供するデリゲートを独自のデータ構造に格納します。続いて、戻ってきた代替デリゲートをプールにキュー登録し、実行する適切なデリゲートをこれらのデータ構造で検索します。このときに、より高い優先順位を持つデリゲートを最初に実行することが望まれます。同様の手法を使用して、質問者のバッチ処理の状況に対処できます。

少々時間をとって、単一キューではなくバッチごとに 1 つのキューがある場合を想像してみましょう。各バッチは、作業を独自の関連キューに挿入します。次に、プールのスレッドを使用して、すべてのキューの間でラウンドロビンを実行します。実際の ThreadPool にキュー登録したデリゲートがデータ構造に戻り、次に調査するキューから開始して作業を検索します。何か見つかった場合は、そのキューから実行します。見つからない場合は、次のキューの検索に進みます。

この方法では、バッチ間に公平なスケジュールを提供できます。1 つのバッチしかない場合、スレッドは必ずそのキューから作業を取り出します。複数のバッチがある場合は、スレッドはほぼ均等の注意を払って各キューを調べます。図 2 に、このソリューションの概要を示します。

netmatters,fig02.gif

図 2 RoundRobinThreadPool アプローチ

先に進めるには、まず、ユーザーが提供するデリゲートを格納するためのデータ構造が必要です。筆者の表現を図 3 に示します。このデータ構造には 3 つのプロパティが含まれています。最初の 2 つは見慣れたものであり、ユーザーが QueueUserWorkItem に提供する状態であるため、当然これをキャッシュする必要があります。一方、3 つ目のプロパティは見慣れないものかもしれません。.NET では、すべての実行スレッドに System.Threading.ExecutionContext が関連付けられています。これは、現在のユーザー、実行の論理スレッドに関連付けられている状態、コード アクセス セキュリティ情報などの情報を表します。このコンテキストが実行の非同期ポイント間を流れることが重要です。

図 3 作業項目をキャプチャする

internal class WorkItem {
  public WaitCallback WaitCallback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = state => {
    var item = (WorkItem)state;
    item.WaitCallback(item.State);
  };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else WaitCallback(State);
  }
}

たとえば、特定の Windows ID を偽装し、ThreadPool.QueueUserWorkItem を呼び出す場合、キューに登録した作業は同じ Windows ID の下で実行する必要があります。そうでない場合、セキュリティ ホールとなる可能性があります。.NET Framework 2.0 以降、このコンテキストはコード内のすべての非同期ポイント間を既定で自動的に流れるようになりました。非同期ポイントとは、ThreadPool.QueueUserWorkItem、新規 Thread の作成、非同期デリゲート呼び出しなどです。

ただし、ここで説明している実装では、異なるルール セットで実行しています。デリゲートがキュー登録される順序を実行順序とは変えることで、この ExecutionContext フローとユーザーが提供した作業項目の間に直接的な対応関係はなくなります。このため、実装では、ユーザーが提供するデリゲートで ExecutionContext を正しくキャッシュしてから、キャプチャしたコンテキストを使用してそのデリゲートを実行する必要があります。

作業項目を入手したら、その項目が保持されるキューを見てみましょう (図 4 に示されています)。RoundRobinThreadPool.Queue データ構造自体はかなり単純です。内部的には、提供されたすべての作業項目を格納するための Queue<WorkItem>、キューに関連付けられている RoundRobinThreadPool インスタンスへの参照、および Dispose メソッドがキューに対して呼び出されたかどうかを示すブール値が含まれます。さらに、ThreadPool と同じシグネチャを持つ QueueUserWorkItem メソッドを提供します。

図 4 ラウンドロビン キュー

public sealed class RoundRobinThreadPool {  
  private List<Queue> _queues;
  ...

  public sealed class Queue : IDisposable {
    internal Queue(RoundRobinThreadPool pool) { _pool = pool; }

    internal Queue<WorkItem> _workItems = new Queue<WorkItem>();
    private RoundRobinThreadPool _pool;
    internal bool _disposed;

    public void QueueUserWorkItem(WaitCallback callback) { 
      QueueUserWorkItem(callback, null); 
    }

    public void QueueUserWorkItem(WaitCallback callback, object state) {
      if (_disposed) 
        throw new ObjectDisposedException(GetType().Name);
      var item = new WorkItem { 
        Context = ExecutionContext.Capture(), 
        WaitCallback = callback, State = state };
      lock (_pool._queues) _workItems.Enqueue(item);
      _pool.NotifyNewWorkItem();
    }

    public void Dispose() {
      if (!_disposed)  {
        lock (_pool._queues) {
          if (_workItems.Count == 0) 
            _pool.RemoveQueueNeedsLock(this);
        }
        _disposed = true;
      }
    }
  }
}

QueueUserWorkItem が呼び出されると、ユーザーが提供するコールバックと状態が (現在の ExecutionContext と共に) WorkItem に取り込まれます。この作業は汎用キューに格納され、新しい作業の到着が関連プールに通知されます。QueueUserWorkItem は複数のスレッドから同時に呼び出される可能性があり、その不変性が保持されるようにする必要があるため、ロックを使用して作業項目キューを保護していることに注目することが重要です。

また、ロックされているオブジェクトが、プールから取得されるグローバル キューの一覧であることに注意してください。筆者は、実装全体でかなり粒度の粗いロックを使用しています。より効率的な実装では、RoundRobinThreadPool 全体で 1 つではなくキューごとに個別のロックを使用するなど、より粒度の細かいロックを使用する可能性が高くなります。ここでは、実装を容易にし単純化するために、単一ロックを選択しました。

Dispose メソッドは、このキューが不要になったときに使用します。典型的なバッチ処理のシナリオでは、キューが作成され、作業がそのキューに登録されてから、キューが破棄されます。Dispose メソッドでこのキューをプールから単純に削除する場合は、作業項目が処理のためにまだキューにあるときに削除される可能性があります。

このため、Dispose は 2 つの処理を行います。まず、作業項目が残っているかどうかを確認します。残っていない場合、キューはプールを呼び出して自身を削除します。次に、自身を破棄済みとしてマークします。まだ削除されていない破棄済みのプールが見つかった場合にプールがどのように処理するかについては、後で示します。

図 5 に、RoundRobinThreadPool クラス自身の実装の残りの部分を示します。プールには 4 つのフィールドが含まれます。

  • プールによって管理される個々のキューの一覧 (これは前に説明したロックとしても機能します)。
  • プールの既定のキュー。
  • 次に作業を検索するキューを表す整数。
  • ThreadPool に実際にキュー登録されるコールバック デリゲート。

図 5 RoundRobinThreadPool

public sealed class RoundRobinThreadPool {
  private List<Queue> _queues;
  private Queue _defaultQueue;
  private int _nextQueue;
  private WaitCallback _callback;

  public RoundRobinThreadPool() {
    _queues = new List<Queue>();
    _callback = DequeueAndExecuteWorkItem;
    _nextQueue = 0;
    _defaultQueue = CreateQueue();
  }

  public Queue CreateQueue() {
    var createdQueue = new Queue(this);
    lock (_queues) _queues.Add(createdQueue);
    return createdQueue;
  }

  public void QueueUserWorkItem(WaitCallback callback)  { 
    QueueUserWorkItem(callback, null); 
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _defaultQueue.QueueUserWorkItem(callback, state);
  }

  private void RemoveQueueNeedsLock(Queue queue) {
    int index = _queues.IndexOf(queue);
    if (_nextQueue >= index) _nextQueue--;
    _queues.RemoveAt(index);
  }

  private void NotifyNewWorkItem() { 
    ThreadPool.UnsafeQueueUserWorkItem(_callback, null); 
  }

  private void DequeueAndExecuteWorkItem(object ignored) {
    WorkItem item = null;

    lock (_queues) {
      var searchOrder = 
        Enumerable.Range(_nextQueue, _queues.Count - _nextQueue).
        Concat(Enumerable.Range(0, _nextQueue));

      foreach (int i in searchOrder) {
        var items = _queues[i]._workItems;
        if (items.Count > 0) {
          item = items.Dequeue();
          _nextQueue = i; 
          if (queue._disposed && 
              items.Count == 0) RemoveQueueNeedsLock(_queues[i]);
          break;
        }
      }
      _nextQueue = (_nextQueue + 1) % _queues.Count;
    }
    if (item != null) item.Execute();
  }

  ... // RoundRobinThreadPool.Queue and .WorkItem, already shown
}

RoundRobinThreadPool が初期化されると、この状態がすべて構成されます。特に、既定のキューは CreateQueue メソッドを呼び出すことで初期化されます。この CreateQueue メソッドは、(独自の分離されたキューを必要とする作業の新しいバッチが到着したときなどに) 開発者がプールに別のキューを追加できるようにするために一般公開されているのと同じメソッドです。単純に新しい RoundRobinThreadPool.Queue インスタンスをインスタンス化し (図 3 で検証した型)、それをキューの一覧に追加し、返します。

使いやすくするために、RoundRobinThreadPool は独自の QueueUserWorkItem メソッドを公開しています。これらは単純に、プールがインスタンス化されたときに作成される既定のキューをターゲットとします。

次に調べるのは、NotifyNewWorkItem メソッドです。QueueUserWorkItem がキューで呼び出されたとき、作業項目を格納した後で、キューがプールでこの NotifyNewWorkItem メソッドを呼び出したことを思い出してください。このメソッドは、単純に実際の ThreadPool をデリゲートし、DequeueAndExecuteWorkItem メソッド (これについては簡単に検証します) に戻るデリゲートを送信します。その名前が示すように、このメソッドはラウンドロビン プールから作業項目をデキューし、実行します。

NotifyNewWorkItem は、ThreadPool.QueueUserWorkItem ではなく ThreadPool.UnsafeQueueUserWorkItem を呼び出していることに注意してください。"Unsafe" プレフィックスは、流れている ExecutionContext ではないことを意味しているだけです。そうしないことで、パフォーマンスの利点があります。また、この実装は ExecutionContext フローを既に手動で処理しているため、ThreadPool でも処理を試行する必要はありません。

DequeueAndExecuteWorkItem では、実際に魔法のような処理が行われます。このメソッドは、キューを検索する順序を最初に生成します。検索順序は、次に調べるキューから一覧の最後までの順であり、さらに再び循環して一覧の先頭から開始し、開始したキューで終わります。実装を簡略化するために、LINQ Enumerable.Range メソッドは、2 つの一覧を生成するために使用されます。これらは、LINQ Enumerable.Concat メソッドを使用して連結されます。

検索順序が決定されたら、作業項目を探します。各キューは指定された順序で調べられ、作業項目が見つかるとすぐにその作業項目が削除され、次のポインタが更新されます。作業項目は、図 3 に示した Execute メソッドを使用して呼び出されます。

ここに特に興味深いコード行がありますが、これは項目が取得されたキューが破棄され空になっているかどうかを確認するためのものです。プールは、このようなキューを検出した場合に、それ以上項目は追加されず (破棄されたため)、それ以上保持する必要はないと判断します。その時点で、RemoveQueueNeedsLock を使用してキュー一覧からターゲット キューが削除され、次のキュー ポインタが範囲外となった場合にそれを更新する可能性があります。

このメソッドではロックを内部的に使用しませんが、共有状態にアクセスします。そのため、このメソッド名には "NeedsLock" サフィックスを付けて、ロックが保持されている間に呼び出す必要があることを思い出せるようにしました。RemoveQueueNeedLock への両方の呼び出しサイト (キューの Dispose メソッドと、プールの DequeueAndExecuteWorkItem メソッド) は、キュー ロックを保持しているときにこれを呼び出します。

実装が完了したら、これをコードでテストできます。次の例では、RoundRobinThreadPool の静的インスタンスを 1 つ作成しました。処理する作業のバッチが到着すると、新しいキューが作成され、すべての作業がこのキューに登録されて、キューが破棄されます。

 

private static RoundRobinThreadPool _pool = 
  new RoundRobinThreadPool();
...
private void ProcessBatch(Batch b) {
  using(var queue = _pool.CreateQueue()) {
    foreach(var item in b) {
      queue.QueueUserWorkItem(() => ProcessItem(item));
    }
  }
}

最初に到着するバッチのすべての作業が最初にスケジュールされても、2 つ目のバッチが到着するとすぐに、処理リソースのほぼ等しい共有が開始します。

この実装に追加できる便利な機能は他にもあり、パフォーマンスの観点で実装を改善できる可能性があります。ただし、非常にわずかなコードで、.NET ThreadPool とそれが提供するすべての機能を利用する抽象化を作成でき、探し求めていたバッチベースの公平性を引き続きサポートできます。

ご意見やご質問は、Stephen (netqa@microsoft.com) まで英語でお送りください。

Stephen Toub は、マイクロソフトの並列コンピューティング プラットフォーム チームのシニア プログラム マネージャです。また、MSDN Magazine の寄稿編集者でもあります。