.NET-Themen

Roundrobin-Zugriff auf den ThreadPool

Stephen Toub

 

F Ich verwende derzeit den Microsoft .NET Framework-ThreadPool und bin auf ein Problem gestoßen, das ich nicht lösen kann. Ich beginne mit einem großen Batch an Arbeitsaufgaben, die in eine Warteschlange eingereiht werden, und ein zweiter (kleinerer) Batch geht ein, nachdem die Verarbeitung des ersten Batches begonnen hat. Anfänglich wird ein Teil der Arbeit in dem großen Batch an alle Arbeitsthreads im ThreadPool verteilt. Aber wenn der zweite Batch eingeht, möchte ich, dass die Verteilung gerecht ist und jeder Batch gleichermaßen bedient wird, statt dass der erste Batch vorrangig behandelt wird, da er zuerst eingegangen ist.

 

**Wenn ein Batch fertig ist, soll der noch zu verarbeitende Batch die Aufmerksamkeit aller Arbeitsthreads erhalten. Kann ich etwas tun, um eine solche Batchverarbeitungsfunktionalität als Schicht über dem ThreadPool einzurichten?

A In früheren Artikeln wurde gezeigt, wie verschiedene Funktionalitäten als Schicht über dem vorhandenen .NET-ThreadPool eingerichtet werden können. In der Ausgabe des MSDN Magazins vom Oktober 2004 wurde erläutert, wie dem ThreadPool Unterstützung für das Warten auf Arbeitsaufgaben in einer Warteschlange hinzugefügt wird (siehe ThreadPoolWait und HandleLeakTracker). In der Ausgabe vom November 2004 wurde gezeigt, wie Unterstützung für Arbeitsaufgabenprioritäten hinzugefügt wird (siehe ThreadPoolPriority und MethodImplAttribute). In der Ausgabe vom März 2006 ging es um das Hinzufügen von Abbruchunterstützung (siehe Abbrechbarer Threadpool). In der Zukunft werde ich auf die Ausgabe vom Januar 2009 zurückblicken und sagen können, dass ich dort gezeigt habe, wie Roundrobin-Planungsunterstützung über dem ThreadPool hinzugefügt wird.

Bei dem Problem, das hier gelöst werden soll, müssen Sie zuerst einmal verstehen, wie der ThreadPool Arbeit verteilt. Intern verwaltet er eine Reihe von Arbeitsaufgaben, die in eine Warteschlange eingereiht wurden. Wenn ein Thread im Pool für das Ausführen von Arbeit verfügbar ist, geht er zurück zur Arbeitswarteschlange und entnimmt das nächste Element. Die Reihenfolge, in der diese Verarbeitung erfolgt, ist nicht dokumentiert und definitiv unzuverlässig (da sie sich in zukünftigen Versionen ändern könnte und wahrscheinlich ändern wird).

Heute wird sie auf sehr einfache Weise implementiert: Es ist eine FIFO-Warteschlange (first in first out). Die erste Arbeit, die in eine Warteschlange eingereiht wird, ist also die erste Arbeit, die von einem Thread übernommen wird. In Ihrem Batchverarbeitungsszenario bedeutet dies, dass die gesamte Arbeit des ersten Batches vor der gesamten Arbeit des zweiten Batches eingereiht ist. Folglich wird die gesamte Arbeit des ersten Batches vor der des zweiten Batches verteilt. Für einige Szenarios ist dies in Ordnung so und optimal. Für Ihr Szenario benötigen Sie mehr Kontrolle.

Eine der einfachsten Möglichkeiten, diese Kontrolle vom ThreadPool zu erhalten, besteht darin, den eigenen Delegaten gegen den Delegaten auszutauschen, den der Benutzer wirklich ausführen lassen möchte. Sie möchten beispielsweise alle unbehandelten Ausnahmen abfangen, die aus Arbeit in einer Warteschlange stammen, und für jede ein Ereignis auslösen. Dazu könnten Sie Code wie den in Abbildung 1 schreiben. Dann verwenden Sie anstelle von ThreadPool.QueueUserWorkItem einfach ExceptionThreadPool.QueueUserWorkItem. Die Arbeit wird noch immer vom ThreadPool ausgeführt, aber die Threads des Pools führen den von Ihnen in eine Warteschlange eingereihten Delegaten statt den vom Benutzer bereitgestellten aus. Durch Aufrufen Ihres Delegaten wird dann der vom Benutzer bereitgestellte Delegat aufgerufen, der Ausnahmen abfängt und das Zielereignis auslöst.

Abbildung 1 Shimming des ThreadPool

public static class ExceptionThreadPool {
  public static void QueueUserWorkItem(
      WaitCallback callback, object state) {
    ThreadPool.QueueUserWorkItem(delegate {
      try { callback(state); }
      catch (Exception exc) {
        var handler = UnhandledException;
        if (handler != null) 
          handler(null, 
            new UnhandledExceptionEventArgs(exc, false));
      }
    });
  }

  public static event 
    UnhandledExceptionEventHandler UnhandledException;
}

Dieses Verfahren ist zwar leistungsfähig, hat aber seinen Preis: Ein zusätzlicher Delegat muss zugeordnet werden, ein zusätzlicher Delegat muss aufgerufen werden und so weiter. Ob dieser Preis untragbar ist, kann nur von Ihnen und Ihren Szenarios entschieden werden, aber diese Art des Schichtens ist in der Regel kostengünstiger, als einen eigenen Threadpool von Grund auf neu zu schreiben.

Dies ist natürlich ein sehr einfaches Beispiel, aber es können kompliziertere Dinge durchgeführt werden. Das Prioritätspoolbeispiel, auf das ich bereits verwiesen habe, speichert die vom Benutzer bereitgestellten Delegaten in seinen eigenen Datenstrukturen. Es reiht dann im Pool einen Ersatzdelegaten ein, der zurückkehrt und diese Datenstrukturen nach dem richtigen Delegaten durchsucht, der ausgeführt werden soll, wobei jene bevorzugt ausgeführt werden, die höhere Prioritäten haben. Sie können ein ähnliches Verfahren einsetzen, um Ihr Problem mit der Batchverarbeitung zu behandeln.

Stellen Sie sich vor, dass Sie keine einzelne Warteschlange haben, sondern eine Warteschlange pro Batch. Jeder Batch würde Arbeit in seine eigene relevante Warteschlange stellen. Sie würden dann die Threads des Pools für die Roundrobin-Funktion zwischen allen Warteschlangen verwenden. Der Delegat, den Sie im echten ThreadPool einreihen, kommt zurück in Ihre Datenstrukturen und sucht, beginnend mit der nächsten zu durchsuchenden Warteschlange, nach Arbeit. Wenn er Arbeit findet, führt er sie aus dieser Warteschlange aus. Wenn nicht, betrachtet er die nächste Warteschlange.

Auf diese Weise können Sie gerechte Planung zwischen den Batches bereitstellen. Wenn nur ein Batch vorhanden ist, entnehmen die Threads Arbeit immer aus dieser einen Warteschlange. Wenn mehrere Batches vorhanden sind, besuchen die Threads die einzelnen Warteschlangen und schenken ihnen in etwa dieselbe Aufmerksamkeit. Abbildung 2 bietet einen Überblick über diese Lösung.

netmatters,fig02.gif

Abbildung 2 RoundRobinThreadPool-Ansatz

Zuerst benötigen Sie eine Datenstruktur, um den vom Benutzer bereitgestellten Delegaten zu speichern. Meine Darstellung wird in Abbildung 3 illustriert. Diese Datenstruktur enthält drei Eigenschaften. Die ersten beiden sollten Ihnen vertraut sein. Es handelt sich um den Zustand, den der Benutzer für QueueUserWorkItem bereitstellt und der natürlich zwischengespeichert werden muss. Die dritte Eigenschaft könnte Ihnen jedoch unbekannt sein. Mit jedem Ausführungsthread in .NET ist ein System.Threading.ExecutionContext verbunden, der Informationen wie beispielsweise den aktuellen Benutzer, den Zustand, der mit dem logischen Ausführungsthread verbunden ist, Sicherheitsinformationen für den Codezugriffs und so weiter repräsentiert. Es ist wichtig, dass dieser Kontext über asynchrone Ausführungspunkte übertragen wird.

Abbildung 3 Erfassen einer Arbeitsaufgabe

internal class WorkItem {
  public WaitCallback WaitCallback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = state => {
    var item = (WorkItem)state;
    item.WaitCallback(item.State);
  };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else WaitCallback(State);
  }
}

Wenn Sie beispielsweise eine bestimmte Windows-Identität annehmen und ThreadPool.QueueUserWorkItem aufrufen, sollte die von Ihnen in eine Warteschlange eingereihte Arbeit unter derselben Windows-Identität ausgeführt werden. Wenn dies nicht der Fall ist, besteht möglicherweise eine Sicherheitslücke. Seit .NET Framework 2.0 wird dieser Kontext automatisch standardmäßig über alle asynchronen Punkte in Ihrem Code übertragen: ThreadPool.QueueUserWorkItem, Erstellen eines neuen Threads, asynchroner Delegatenaufruf und so weiter.

Bei der hier erörterten Implementierung richten Sie sich jedoch nach anderen Spielregeln. Bei Änderung der Reihenfolge, in der Delegaten in eine Warteschlange eingereiht sind, gegenüber der Reihenfolge, in der sie ausgeführt werden, besteht keine direkte Entsprechung mehr zwischen diesem ExecutionContext-Fluss und den Arbeitsaufgaben, die der Benutzer bereitgestellt hat. Daher muss Ihre Implementierung den ExecutionContext mit dem vom Benutzer bereitgestellten Delegaten richtig zwischenspeichern und dann den erfassten Kontext zum Ausführen dieses Delegaten verwenden.

Nun, da Sie eine Arbeitsaufgabe haben, wollen wir die Warteschlange betrachten, in der sie aufbewahrt wird (dargestellt in Abbildung 4). Die RoundRobinThreadPool.Queue-Datenstruktur selbst ist recht einfach. Intern enthält sie ein Queue<WorkItem> zum Speichern aller Arbeitsaufgaben, die für sie bereitgestellt werden, einen Verweis auf die RoundRobinThreadPool-Instanz, die der Warteschlange zugeordnet ist, und einen booleschen Wert, der anzeigt, ob die Dispose-Methode in der Warteschlange aufgerufen wurde. Sie stellt dann QueueUserWorkItem-Methoden mit derselben Signatur wie jener des ThreadPool bereit.

Abbildung 4 Eine Roundrobin-Warteschlange

public sealed class RoundRobinThreadPool {  
  private List<Queue> _queues;
  ...

  public sealed class Queue : IDisposable {
    internal Queue(RoundRobinThreadPool pool) { _pool = pool; }

    internal Queue<WorkItem> _workItems = new Queue<WorkItem>();
    private RoundRobinThreadPool _pool;
    internal bool _disposed;

    public void QueueUserWorkItem(WaitCallback callback) { 
      QueueUserWorkItem(callback, null); 
    }

    public void QueueUserWorkItem(WaitCallback callback, object state) {
      if (_disposed) 
        throw new ObjectDisposedException(GetType().Name);
      var item = new WorkItem { 
        Context = ExecutionContext.Capture(), 
        WaitCallback = callback, State = state };
      lock (_pool._queues) _workItems.Enqueue(item);
      _pool.NotifyNewWorkItem();
    }

    public void Dispose() {
      if (!_disposed)  {
        lock (_pool._queues) {
          if (_workItems.Count == 0) 
            _pool.RemoveQueueNeedsLock(this);
        }
        _disposed = true;
      }
    }
  }
}

Wenn QueueUserWorkItem aufgerufen wird, werden der Rückruf und der vom Benutzer bereitgestellte Zustand (zusammen mit dem aktuellen ExecutionContext) in einem WorkItem erfasst. Diese Arbeit wird dann in der generischen Warteschlange gespeichert, und der entsprechende Pool wird benachrichtigt, dass neue Arbeit eingegangen ist. Sie sollten unbedingt beachten, dass eine Sperre zum Schützen der Arbeitsaufgabenwarteschlange verwendet wird, da QueueUserWorkItem gleichzeitig von mehreren Threads aufgerufen werden kann und sichergestellt werden muss, dass Invarianten gehalten werden.

Es gilt auch zu beachten, dass das Objekt, das gesperrt wird, eine globale Warteschlangenliste ist, die aus dem Pool stammt. Ich verwende eine ziemlich grobe Sperre für die gesamte Implementierung. Eine effizientere Implementierung würde wahrscheinlich feiner abgestufte Sperren verwenden, beispielsweise einzelne Sperren pro Warteschlange statt einer Sperre für den ganzen RoundRobinThreadPool. Für eine einfache Implementierung und aus Gründen der Einfachheit habe ich mich für die einzelne Sperre entschieden.

Die Dispose-Methode wird verwendet, wenn diese Warteschlange nicht mehr erforderlich ist. In einem typischen Batchverarbeitungsszenario wird eine Warteschlange erstellt und Arbeit in die Warteschlange eingereiht. Anschließend wird die Warteschlange verworfen. Wenn die Dispose-Methode diese Warteschlange einfach aus dem Pool entfernen würde, wäre es wahrscheinlich, dass sie entfernt würde, während noch immer zu verarbeitende Arbeitsaufgaben in ihr vorhanden sind.

Daher tut Dispose zwei Dinge. Erstens prüft die Methode, ob noch Arbeitsaufgaben verblieben sind. Ist dies nicht der Fall, ruft die Warteschlange den Pool auf, um sich selbst zu entfernen. Zweitens kennzeichnet sie sich als verworfen. Sie werden gleich sehen, wie der Pool eine Situation behandelt, in der er auf einen verworfenen Pool stößt, der nicht entfernt wurde.

Abbildung 5 zeigt die übrige Implementierung der RoundRobinThreadPool-Klasse selbst. Der Pool enthält vier Felder:

  • Eine Liste der einzelnen Warteschlangen, die vom Pool verwaltet werden (sie dient auch als die bereits erwähnte Sperre).
  • Eine Standardwarteschlange für den Pool.
  • Eine Ganzzahl, die die nächste Warteschlange repräsentiert, die nach Arbeit durchsucht werden soll.
  • Den Rückrufdelegaten, der in den ThreadPool eingereiht ist.

Abbildung 5 RoundRobinThreadPool

public sealed class RoundRobinThreadPool {
  private List<Queue> _queues;
  private Queue _defaultQueue;
  private int _nextQueue;
  private WaitCallback _callback;

  public RoundRobinThreadPool() {
    _queues = new List<Queue>();
    _callback = DequeueAndExecuteWorkItem;
    _nextQueue = 0;
    _defaultQueue = CreateQueue();
  }

  public Queue CreateQueue() {
    var createdQueue = new Queue(this);
    lock (_queues) _queues.Add(createdQueue);
    return createdQueue;
  }

  public void QueueUserWorkItem(WaitCallback callback)  { 
    QueueUserWorkItem(callback, null); 
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _defaultQueue.QueueUserWorkItem(callback, state);
  }

  private void RemoveQueueNeedsLock(Queue queue) {
    int index = _queues.IndexOf(queue);
    if (_nextQueue >= index) _nextQueue--;
    _queues.RemoveAt(index);
  }

  private void NotifyNewWorkItem() { 
    ThreadPool.UnsafeQueueUserWorkItem(_callback, null); 
  }

  private void DequeueAndExecuteWorkItem(object ignored) {
    WorkItem item = null;

    lock (_queues) {
      var searchOrder = 
        Enumerable.Range(_nextQueue, _queues.Count - _nextQueue).
        Concat(Enumerable.Range(0, _nextQueue));

      foreach (int i in searchOrder) {
        var items = _queues[i]._workItems;
        if (items.Count > 0) {
          item = items.Dequeue();
          _nextQueue = i; 
          if (queue._disposed && 
              items.Count == 0) RemoveQueueNeedsLock(_queues[i]);
          break;
        }
      }
      _nextQueue = (_nextQueue + 1) % _queues.Count;
    }
    if (item != null) item.Execute();
  }

  ... // RoundRobinThreadPool.Queue and .WorkItem, already shown
}

Bei Initialisieren eines RoundRobinThreadPool wird dieser gesamte Zustand konfiguriert. Insbesondere wird die Standardwarteschlange durch Aufrufen der CreateQueue-Methode initialisiert. Diese CreateQueue-Methode ist dieselbe Methode, die öffentlich verfügbar gemacht wird, damit ein Entwickler dem Pool eine weitere Warteschlange hinzufügen kann (wenn beispielsweise ein neuer Batch Arbeit eingeht, der seine eigene isolierte Warteschlange benötigt). Sie instanziiert einfach eine neue RoundRobinThreadPool.Queue-Instanz (der in Abbildung 3 erläuterte Typ), fügt sie der Warteschlangenliste hinzu und gibt sie zurück.

Für eine größere Benutzerfreundlichkeit macht der RoundRobinThreadPool seine eigenen QueueUserWorkItem-Methoden verfügbar. Diese zielen einfach auf die Standardwarteschlange ab, die beim Instanziieren des Pools erstellt wurde.

Als Nächstes wird die NotifyNewWorkItem-Methode untersucht. Sie werden sich erinnern, dass die Warteschlange nach Speichern der Arbeitsaufgabe diese NotifyNewWorkItem-Methode im Pool aufgerufen hat, nachdem QueueUserWorkItem in einer Warteschlange aufgerufen wurde. Diese Methode delegiert einfach an den echten ThreadPool und sendet einen Delegaten, der zur DequeueAndExecuteWorkItem-Methode zurückkehrt (diese wird gleich näher untersucht). Wie der Name schon sagt, entfernt diese Methode eine Arbeitsaufgabe aus dem Roundrobin-Pool und führt sie aus.

Beachten Sie, dass NotifyNewWorkItem ThreadPool.UnsafeQueueUserWorkItem anstelle von ThreadPool.QueueUserWorkItem aufruft. Das Präfix „Unsafe“ impliziert einfach, dass es sich nicht um fließenden ExecutionContext handelt. Dies birgt einen Leistungsvorteil. Da diese Implementierung bereits ExecutionContext-Fluss manuell behandelt, besteht keine Notwendigkeit, dass der ThreadPool dies ebenfalls versucht.

In DequeueAndExecuteWorkItem liegt der Schlüssel. Diese Methode generiert zuerst eine Reihenfolge, in der die Warteschlangen durchsucht werden. Die Suchreihenfolge geht von der nächsten zu überprüfenden Warteschlange bis zum Ende der Liste und beginnt dann den Kreislauf von neuem, indem sie am Anfang der Liste beginnt und sich zu der Warteschlange vorarbeitet, bei der sie begonnen hat. Um die Implementierung zu vereinfachen, wird die LINQ-Enumerable.Range-Methode zum Generieren von zwei Listen verwendet, die dann mithilfe der LINQ-Enumerable.Concat-Methode verkettet werden.

Wenn die Suchreihenfolge vorhanden ist, geht sie auf die Suche nach Arbeitsaufgaben. Jede Warteschlange wird in der angegebenen Reihenfolge untersucht, und sobald eine Arbeitsaufgabe gefunden wird, wird die Arbeitsaufgabe entfernt, und der nächste Zeiger wird aktualisiert. Die Arbeitsaufgabe wird dann mithilfe der in Abbildung 3 dargestellten Execute-Methode aufgerufen.

Es gibt hier eine besonders interessante Codezeile. Es ist die Überprüfung, um festzustellen, ob die Warteschlange, aus der gerade ein Element abgerufen wurde, verworfen wurde und leer ist. Wenn der Pool eine solche Warteschlange findet, weiß er, dass ihr keinerlei Elemente mehr hinzugefügt werden (da sie verworfen wurde) und sie folglich nicht mehr beibehalten werden muss. An dieser Stelle wird RemoveQueueNeedsLock zum Entfernen der Zielwarteschlange aus der Warteschlangenliste und möglicherweise zum Aktualisieren des nächsten Warteschlangenzeigers verwendet, falls er sich nicht mehr im Bereich befindet.

Beachten Sie, dass diese Methode intern keine Sperre verwendet, sondern auf den freigegebenen Zustand zugreift. Daher habe ich die Methode mit einem „NeedsLock“-Suffix benannt, um mich daran zu erinnern, dass sie aufgerufen werden muss, während die Sperre gehalten wird. Sie werden feststellen, dass beide Aufrufsites für RemoveQueueNeedLock (eine in der Dispose-Methode der Warteschlange und eine in der DequeueAndExecuteWorkItem-Methode des Pools) diese Methode aufrufen, während die Warteschlangensperre gehalten wird.

Nach abgeschlossener Implementierung können Sie dies nun in Ihrem Code testen. In dem folgenden Beispiel habe ich eine einzelne statische Instanz des RoundRobinThreadPool erstellt. Wenn ein Arbeitsbatch zur Verarbeitung eingeht, wird eine neue Warteschlange erstellt, die gesamte Arbeit wird in diese Warteschlange gestellt, und die Warteschlange wird verworfen.

private static RoundRobinThreadPool _pool = 
  new RoundRobinThreadPool();
...
private void ProcessBatch(Batch b) {
  using(var queue = _pool.CreateQueue()) {
    foreach(var item in b) {
      queue.QueueUserWorkItem(() => ProcessItem(item));
    }
  }
}

Obwohl die gesamte Arbeit des zuerst eingegangenen Batches zuerst geplant wird, erhält der zweite Batch, sobald er eingeht, etwa denselben Anteil der Verarbeitungsressourcen.

Dieser Implementierung könnten weitaus mehr Features hinzugefügt werden, und sie könnte leistungsmäßig wahrscheinlich verbessert werden. Aber mit sehr wenig Code konnten Sie eine Abstraktion erstellen, die den .NET-ThreadPool und alle seine Funktionen nutzt, während Sie gleichzeitig die gewünschte Unterstützung für die batchbasierte Fairness erhalten.

Senden Sie Fragen und Kommentare für Stephen Toub in englischer Sprache an netqa@microsoft.com.

Stephen Toub ist leitender Programmmanager im Parallel Computing Platform-Team von Microsoft. Er schreibt außerdem redaktionelle Beiträge für das MSDN Magazin.