.NET-Themen

Nach Reihenfolge geordnete Ausführung mit ThreadPool

Stephen Toub

F Viele Komponenten in meinem System müssen ihre Arbeit asynchron ausführen, was mich darauf schließen lässt, dass das Microsoft .NET Framework-ThreadPool die richtige Lösung ist. Allerdings glaube ich, dass ich eine einzigartige Anforderung habe: Jede Komponente muss sicherstellen, dass ihre Arbeitsaufgaben in der richtigen Reihenfolge verarbeitet werden und dass deshalb keine zwei Arbeitsaufgaben gleichzeitig ausgeführt werden. Es können jedoch mehrere Komponenten zur gleichen Zeit nebeneinander ausgeführt werden. Tatsächlich ist dies sogar erwünscht. Können Sie mir etwas empfehlen?

A Diese Problemstellung ist nicht so einzigartig, wie Sie vielleicht denken, da sie in vielen wichtigen Szenarios auftritt, einschließlich derer, die auf Nachrichtenübermittlung basieren. Betrachten Sie eine Pipelineimplementierung, die Vorteile aus der Parallelität zieht, weil mehrere Phasen der Pipeline zur gleichen Zeit aktiv sind.

Sie könnten zum Beispiel eine Pipeline haben, die Daten aus einer Datei einliest, sie komprimiert, verschlüsselt und in eine neue Datei schreibt. Die Komprimierung kann gleichzeitig mit der Verschlüsselung ausgeführt werden, aber nicht mit denselben Daten zur selben Zeit, da die Datenausgabe einer Komponente die Dateneingabe der anderen sein muss. Vielmehr kann die Komprimierungsroutine Daten komprimieren und zur Verarbeitung an die Verschlüsselungsroutine senden. An diesem Punkt kann die Komprimierungsroutine die nächsten Daten verarbeiten.

Da viele Komprimierungs- und Verschlüsselungsalgorithmen einen Zustand beibehalten, der sich darauf auswirkt, wie Daten zukünftig komprimiert und verschlüsselt werden, ist es wichtig, dass diese Reihenfolge beibehalten wird. (Es spielt keine Rolle, dass in diesem Beispiel Dateien verarbeitet werden, und es wäre hilfreich, wenn Sie die Ergebnisse entschlüsseln und dekomprimieren könnten, um das Original mit allen Daten in der richtigen Reihenfolge zurückzuerhalten.)

Es gibt mehrere potenzielle Lösungen. Bei der ersten Lösung wird einfach jeder Komponente ein Thread gewidmet. Dieser DedicatedThread hätte eine FIFO-Warteschlange (first in first out) für Arbeitsaufgaben, die ausgeführt werden müssen, und einen einzelnen Thread, der diese Warteschlange bedient. Wenn die Komponente auszuführende Arbeit hat, stellt sie sie in die Warteschlange. Der Thread wird diese Arbeit schließlich abholen und ausführen. Da es nur einen Thread gibt, wird nur ein Element auf einmal ausgeführt werden. Da eine FIFO-Warteschlange verwendet wird, werden die Arbeitsaufgaben in der Reihenfolge verarbeitet, in der sie generiert wurden.

Ähnlich wie in dem Beispiel, das ich im .NET-Themen-Artikel vom Januar 2008 vorgestellt habe, werde ich eine einfache WorkItem-Klasse verwenden, um die auszuführende Arbeit zu repräsentieren (siehe Abbildung 1). Eine Implementierung von DedicatedThread, die diesen WorkItem-Typ verwendet, ist in Abbildung 2 dargestellt. Der Hauptteil der Implementierung besteht in einer naiven BlockingQueue<T>-Implementierung (.NET Framework 4.0 enthält einen BlockingCollection<T>-Typ, der dieser Art von Implementierung besser entsprechen würde). Der Konstruktor von DedicatedThread erstellt einfach eine BlockingQueue<T>-Instanz, entwickelt einen Thread, der kontinuierlich darauf wartet, dass ein anderes Element in der Warteschlange ankommt, und es anschließend verarbeitet.

Abbildung 1 Erfassen einer Arbeitsaufgabe

internal class WorkItem {
  public WaitCallback Callback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = s => {
    var item = (WorkItem)s;
    item.Callback(item.State);
 };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else Callback(State);
  }
}

Abbildung 2 DedicatedThread-Implementierung

public class DedicatedThread {
  private BlockingQueue<WorkItem> _workItems = 
    new BlockingQueue<WorkItem>();

  public DedicatedThread() {
    new Thread(() => {
      while (true) { workItems.Dequeue().Execute(); }
    }) { IsBackground = true }.Start();
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _workItems.Enqueue(new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() });
  }

  private class BlockingQueue<T> {
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _gate = new Semaphore(0, Int32.MaxValue);

    public void Enqueue(T item) {
      lock (_queue) _queue.Enqueue(item);
      _gate.Release();
    }

    public T Dequeue() {
      _gate.WaitOne();
      lock (_queue) return _queue.Dequeue();
    }
  }
}

Dies bietet die grundlegende Funktionalität für Ihr Szenario und erfüllt möglicherweise Ihre Anforderungen, aber es gibt einige nicht unerhebliche Nachteile. Zunächst wird für jede Komponente ein Thread reserviert. Bei einer oder zwei Komponenten ist dies wahrscheinlich kein Problem. Aber bei vielen Komponenten könnte dies zu einem beträchtlichen Anstieg der Anzahl der Threads führen. Dies kann gravierende Leistungseinbußen verursachen.

Diese spezielle Implementierung ist außerdem nicht sehr stabil. Was geschieht, wenn Sie beispielsweise eine Komponente entfernen möchten. Wie teilen Sie dem Thread mit, dass er die Blockierung aufheben soll? Was geschieht, wenn von einer Arbeitsaufgabe eine Ausnahme ausgegeben wird?

Nebenbei ist anzumerken, dass diese Lösung derjenigen ähnelt, die Windows in einem typischen Nachrichtensystem verwendet. Das Nachrichtensystem ist eine Schleife, die auf ankommende Nachrichten wartet, sie verteilt (verarbeitet) und dann zurückkehrt und auf weitere Nachrichten wartet. Die Nachrichten für ein bestimmtes Fenster werden von einem einzelnen Thread verarbeitet. Die Ähnlichkeiten sind im Code in Abbildung 3 zu sehen, der ein sehr ähnliches Verhalten aufweisen sollte, wie der Code in Abbildung 2. Es wird ein neuer Thread entwickelt, der ein Control erstellt, sicherstellt, dass sein Handle initialisiert wurde, und mithilfe von Application.Run eine Nachrichtenschleife ausführt. Um eine Arbeitsaufgabe in die Warteschlange dieses Threads einzufügen, verwenden Sie einfach die BeginInvoke-Methode von Control. Beachten Sie, dass ich diesen Ansatz nicht empfehle, sondern vielmehr darauf hinweise, dass im Wesentlichen das gleiche Grundkonzept wie bei der DedicatedThread-Lösung, die bereits gezeigt wurde, verwendet wird.

Abbildung 3 Ähnlichkeiten mit einer Benutzeroberflächen-Nachrichtenschleife

public class WindowsFormsDedicatedThread {
  private Control _control;

  public WindowsFormsDedicatedThread() {
    using (var mre = new ManualResetEvent(false)) {
      new Thread(() => {
        _control = new Control();
        var forceHandleCreation = _control.Handle;
        mre.Set();
        Application.Run();
      }) { IsBackground = true }.Start();
      mre.WaitOne();
    }
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _control.BeginInvoke(callback, state);
  }
} 

Eine zweite Lösung beinhaltet die Verwendung von ThreadPool für die Ausführung. Statt einen neuen, benutzerdefinierten Thread für jede Komponente zu entwickeln, die eine private Warteschlange bedient, verwenden wir nur eine Warteschlange pro Komponente, sodass niemals zwei Elemente aus derselben Warteschlange gleichzeitig bereitgestellt werden. Dies bringt den Vorteil mit sich, dass ThreadPool selbst kontrollieren kann, wie viele Threads notwendig sind, um ihre Injektion und ihre Deaktivierung zu behandeln, um Zuverlässigkeitsprobleme zu beheben und um Sie davon abzubringen, neue Threads zu entwickeln, was selten die richtige Vorgehensweise ist.

Eine Implementierung dieser Lösung ist in Abbildung 4 dargestellt. Die FifoExecution-Klasse hat nur zwei Felder: eine Warteschlange mit Arbeitsaufgaben, die verarbeitet werden müssen, und einen booleschen Wert, der anzeigt, ob an ThreadPool eine Anforderung zum Verarbeiten von Arbeitsaufgaben gesendet wurde. Beide Felder werden von einer Sperre in der Arbeitsaufgabenliste geschützt. Die übrige Implementierung besteht lediglich aus zwei Methoden.

Abbildung 4 Implementieren von FifoExecution

public class FifoExecution {
  private Queue<WorkItem> _workItems = new Queue<WorkItem>();
  private bool _delegateQueuedOrRunning = false;

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    var item = new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() };
    lock (_workItems) {
      _workItems.Enqueue(item);
      if (!_delegateQueuedOrRunning) {
        _delegateQueuedOrRunning = true;
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
      }
    }
  }

  private void ProcessQueuedItems(object ignored) {
    while (true) {
      WorkItem item;
      lock (_workItems) {
        if (_workItems.Count == 0) {
          _delegateQueuedOrRunning = false;
          break;
        }
        item = _workItems.Dequeue();
      }
      try { item.Execute(); }
      catch {
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
          null);
        throw;
      }
    }
  }
}

Die erste Methode ist QueueUserWorkItem mit einer Signatur, die derjenigen entspricht, die durch ThreadPool verfügbar gemacht wird. (ThreadPool bietet auch eine praktische Überladung, die nur ein WaitCallback akzeptiert, eine Überladung, die Sie bei Bedarf hinzufügen können.) Zuerst erstellt die Methode ein zu speicherndes WorkItem und aktiviert anschließend die Sperre. (Beim Erstellen von WorkItem gibt es keinen Zugriff auf einen freigegebenen Zustand. Folglich wird das Erfassen des Elements, um die Sperre so klein wie möglich zu halten, vor dem Aktivieren der Sperre durchgeführt.) Sobald die Sperre aktiviert ist, wird die erstellte Arbeitsaufgabe in die Arbeitsaufgaben-Warteschlange eingereiht.

Die Methode prüft dann, ob eine Anforderung zum Verarbeiten in die Warteschlange eingereihter Arbeitsaufgaben an ThreadPool gesendet wurde, und wenn dies nicht der Fall ist, sendet sie eine solche Anforderung (und vermerkt es für die Zukunft). Diese Anforderung an ThreadPool bedeutet, dass zur Ausführung der ProcessQueuedItems-Methode der Thread von ThreadPool verwendet wird.

Sobald ProcessQueuedItems von einem ThreadPool-Thread aufgerufen wird, tritt es in eine Schleife ein. In dieser Schleife aktiviert es die Sperre, und während die Sperre aktiviert bleibt, überprüft es, ob weitere Arbeitsaufgaben zum Verarbeiten anstehen. Wenn keine weiteren Aufgaben anstehen, setzt es das Anforderungskennzeichen zurück (sodass zukünftige in die Warteschlange eingereihte Elemente die Verarbeitung wieder vom Pool anfordern) und wird beendet. Wenn Arbeitsaufgaben zum Verarbeiten anstehen, nimmt es die nächste, hebt die Sperre auf, führt die Verarbeitung aus und beginnt wieder von vorn, wobei es solange ausgeführt wird, bis es in der Warteschlange keine Elemente mehr gibt.

Dies ist eine einfache, aber dennoch leistungsfähige Implementierung. Eine Komponente kann jetzt eine FifoExecution-Instanz erstellen und sie zum Planen von Arbeitsaufgaben verwenden. Pro FifoExecution-Instanz kann nur eine in die Warteschlange eingereihte Arbeitsaufgabe auf einmal ausgeführt werden. Arbeitsaufgaben werden in der Reihenfolge ausgeführt, in der sie eingereiht wurden. Außerdem können Arbeitsaufgaben aus unterschiedlichen FifoExecution-Instanzen gleichzeitig ausgeführt werden. Das Beste daran ist, dass Sie sich nicht mehr um die Threadverwaltung kümmern müssen – die gesamte harte (aber sehr wichtige) Arbeit der Threadverwaltung wird ThreadPool überlassen.

Im extremen Fall, dass der Pool durch jede Komponente voll mit Arbeit ausgelastet wird, wird ThreadPool sehr wahrscheinlich auf einen Thread pro Komponente aufgestockt, so wie in der ursprünglichen DedicatedThread-Implementierung. Dies wird jedoch nur geschehen, wenn es von ThreadPool als angemessen erachtet wird. Wenn Komponenten den Pool nicht auslasten, sind viel weniger Threads erforderlich.

Es gibt zusätzliche Vorteile, wie z. B. jenen, ThreadPool hinsichtlich der Ausnahmen das Richtige tun zu lassen. Was geschieht in der DedicatedThread-Implementierung, wenn die Verarbeitung eines Elements eine Ausnahme auslöst? Der Thread abstürzt ab, aber in Abhängigkeit von der Konfiguration der Anwendung wird der Prozess möglicherweise nicht abgebrochen. In diesem Fall werden Arbeitsaufgaben beginnen, sich bei DedicatedThread einzureihen, aber keine von ihnen wird jemals verarbeitet. Bei FifoExecution wird ThreadPool am Ende mehrere Threads hinzufügen, um diejenigen zu kompensieren, die verschwunden sind.

In Abbildung 5 ist eine einfache Demoanwendung dargestellt, die die FifoExecution-Klasse verwendet. Diese Anwendung hat drei Phasen in einer Pipeline. Jede Phase schreibt die ID der aktuellen Information aus, die gerade verarbeitet wird (was einfach die Schleifeniteration ist). Anschließend wird ein Teil der Arbeit ausgeführt (hier durch ein Thread.SpinWait repräsentiert), und die Daten werden an die nächste Phase (wieder nur die Schleifeniteration) weitergegeben. Bei jedem Schritt werden Informationen mit einer anderen Anzahl von Registerkarten ausgegeben, damit die Ergebnisse einfach separat eingesehen werden können. Wie Sie bei den in Abbildung 6 gezeigten Ausgaben beobachten können, behält die Arbeit in jeder Phase (Spalte) die richtige Reihenfolge.

Abbildung 5 Vorführung von FifoExecution

static void Main(string[] args) {
  var stage1 = new FifoExecution();
  var stage2 = new FifoExecution();
  var stage3 = new FifoExecution();

  for (int i = 0; i < 100; i++) {
    stage1.QueueUserWorkItem(one => {
      Console.WriteLine("" + one);
      Thread.SpinWait(100000000);

      stage2.QueueUserWorkItem(two => {
        Console.WriteLine("\t\t" + two);
        Thread.SpinWait(100000000);

        stage3.QueueUserWorkItem(three => {
          Console.WriteLine("\t\t\t\t" + three);
          Thread.SpinWait(100000000);
        }, two);
      }, one);
    }, i);
  }

   Console.ReadLine();
}

Abbildung 6 Ausgabe aus der Demoanwendung

Es ist auch interessant zu sehen, dass es zwischen den Phasen der Pipelines an Fairness mangelt. Sie können zum Beispiel sehen, dass Phase 1 in Abbildung 6 bereits bei Iteration 21 ist, während Phase 2 immer noch bei 13 und Phase 3 bei 9 ist. Dies ist größtenteils auf meine Implementierung von ProcessQueuedItems zurückzuführen. Die Beispielanwendung verschiebt sehr schnell 100 Arbeitsaufgaben in Phase 1, und folglich wird der Thread aus dem Pool, der Phase 1 bedient, wahrscheinlich in der ProcessQueuedItems-Schleife sitzen und nicht zurückkehren, bis es in Phase 1 keine Arbeit mehr gibt. Dies in unfair gegenüber den anderen Phasen. Wenn Sie ein ähnliches Verhalten in Ihrer Anwendung beobachten und dies zu Problemen führt, können Sie die Fairness zwischen den Phasen durch Ändern der Implementierung von ProcessQueuedItems erhöhen und etwa wie folgt gestalten:

private void ProcessQueuedItems(object ignored) {
  WorkItem item;
  lock (_workItems) {
    if (_workItems.Count == 0) {
      _delegateQueuedOrRunning = false;
      return;
    }
    item = _workItems.Dequeue();
  }
  try { item.Execute(); }
  finally {
    ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
      null);
  }
}

Selbst wenn es nun mehr zu verarbeitende Elemente gibt, wird ProcessQueuedItems nicht in der Schleife hängenbleiben, sondern sich rekursiv in ThreadPool und folglich hinter Elementen anderer Phasen einreihen. Mit dieser Änderung sieht die Ausgabe der Anwendung in Abbildung 5 nun wie diejenige in Abbildung 7 aus. In dieser neuen Ausgabe können Sie sehen, dass Phase 2 und Phase 3 von der Planung tatsächlich mit mehr Fairness als bisher behandelt werden (es gibt immer noch eine Verzögerung zwischen den Phasen, aber da es eine Pipeline ist, ist dies zu erwarten).

Abbildung 7 Neue Ausgabe mit Planung mit mehr Fairness

Selbstverständlich ist diese erhöhte Fairness mit Kosten verbunden. Jede Arbeitsaufgabe durchläuft eine zusätzliche Runde durch den Planer. Sie müssen entscheiden, ob dies ein Kompromiss ist, den Sie für Ihre Anwendung annehmen möchten. Wenn beispielsweise die Arbeit, die Sie in Ihren Arbeitsaufgaben ausführen, nennenswert ist, sollte dieser Aufwand vernachlässigbar sein.

Dies ist nur ein weiteres Beispiel für die Möglichkeit, basierend auf ThreadPool Systeme zu erstellen, die Funktionalität hinzufügen, ohne selbst benutzerdefinierte Threadpools erstellen zu müssen. Weitere Beispiele finden Sie in früheren Ausgaben der .NET-Themen-Rubrik des MSDN Magazins.

Senden Sie Fragen und Kommentare in englischer Sprache an netqa@microsoft.com.

Stephen Toub ist leitender Programmmanager im Parallel Computing Platform-Team von Microsoft. Er schreibt außerdem redaktionelle Beiträge für das MSDN Magazin.