MSDN Magazin > Home > Ausgaben > 2007 > May >  Tiefe Einblicke in CLR: 9 wiederverwendbare par...
Tiefe Einblicke in CLR
9 wiederverwendbare parallele Datenstrukturen und Algorithmen
Joe Duffy

Codedownload verfügbar unter: CLRInsideOut2007_05.exe (156 KB)
Browse the Code Online
In diesem Artikel wird weniger die Mechanik eines Common Language Runtime-Features (CLR) besprochen, sondern vielmehr die effiziente Nutzung vorhandener Ressourcen. Die Auswahl der richtigen Datenstrukturen und Algorithmen ist selbstverständlich eine der gängigsten aber auch wichtigsten Entscheidungen, die ein Programmierer treffen muss. Die falsche Auswahl kann den Unterschied zwischen Erfolg und Misserfolg ausmachen oder, wie in den häufigsten Fällen, zwischen guter und, sagen wir, furchtbarer Leistung. Wenn berücksichtigt wird, dass parallele Programmierung häufig zur Leistungsverbesserung eingesetzt wird und dass sie normalerweise schwieriger ist als serielle Programmierung, ist die Auswahl noch grundlegender für Ihren Erfolg.
In diesem Artikel werden neun wiederverwendbare Datenstrukturen und Algorithmen genauer betrachtet, die in vielen parallelen Programmen verwendet werden, und die Sie einfach an Ihre eigene .NET-Software anpassen können. Jedes Beispiel enthält vollständig funktionierenden Code, der jedoch nicht unbedingt komplett geprüft, abgestimmt und bewährt ist. Die Liste ist keineswegs geschlossen, aber es sind viele häufig verwendete Muster repräsentiert. Sie werden feststellen, dass viele der Beispiele aufeinander aufbauen.
Eines muss gleich zu Beginn erwähnt werden. Das Microsoft® .NET Framework hat mehrere gleichzeitige Stammfunktionen. Es wird zwar gezeigt, wie Sie Ihre eigenen Stammfunktionen erstellen können; für die meisten Situationen jedoch sind die vorhandenen völlig ausreichend. Damit soll nur demonstriert werden, dass es manchmal von Vorteil ist, Alternativen in Betracht zu ziehen. Außerdem vertiefen Sie Ihr Wissen über parallele Programmierung im Allgemeinen, wenn Sie diese Verfahren in Aktion sehen. Eine grundlegende Kenntnis der vorhandenen Stammfunktionen wird jedoch vorausgesetzt. Eine umfassende Übersicht finden Sie in Vance Morrisons „Was jeder Entwickler über Multithreadanwendungen wissen muss“ in der Ausgabe des MSDN®-Magazins von August 2005.
Sehen Sie sich die Verfahren an.

Countdownsperre
Semaphoren sind aus vielen Gründen eine der bekannteren Datenstrukturen in der gleichzeitigen Programmierung. Mit der wichtigste Grund ist, dass sie eine lange Geschichte in der Informatik haben, die bis zum Entwurf von Betriebssystemen in den 1960er Jahren zurückgeht. Eine Semaphore ist ganz einfach eine Datenstruktur mit einem Count-Feld. Sie unterstützt zwei Vorgänge: Put und Take (häufig auch P und V genannt). Ein Put-Vorgang erhöht den Zähler der Semaphore um eins, und ein Take-Vorgang verringert ihn um eins. Wenn der Zähler der Semaphore auf Null steht, wird jeder folgende Versuch eines Take-Vorgangs geblockt (der Vorgang wartet), bis ein gleichzeitiger Put-Vorgang den Zähler auf einen Wert setzt, der nicht Null ist. Beide sind atomare, sicher gleichzeitig ausführbare Operationen, die gewährleisten, dass gleichzeitige Put- und Take-Vorgänge mit Bezug aufeinander serialisiert werden. Windows® verfügt über erstklassige Kernel- und Win32®-Unterstützung für Semaphorenobjekte (siehe CreateSemaphore und verwandte APIs), und sie treten im .NET Framework in der Klasse System.Threading.Semaphore auf. Kritische Bereiche, die von Mutex und Monitor unterstützt werden, sind oft durch eine besondere Semaphore mit einem Zählerstand gekennzeichnet, der zwischen 0 und 1 wechselt – anders ausgedrückt, eine binäre Semaphore.
Eine Art „umgekehrte Semaphore“ ist auch oft nützlich. Manchmal ist eine Datenstruktur erforderlich, die das Warten auf den Zählerstand Null der Datenstruktur unterstützt. Verzweigungs-/Verbindungs-Parallelität – Ein einziger „Master“-Thread steuert die Ausführung von n „untergeordneten“ Threads und wartet dann auf deren Beendigung. Dies ist in datenparalleler Programmierung sehr verbreitet, und eine umgekehrte Semaphore ist in diesen Fällen sehr nützlich. Meistens soll verhindert werden, dass erwachende Threads den Zählerstand verändern. In diesem Fall nennen wir die Struktur Countdownsperre, um anzugeben, dass der Zählerstand zurückgeht, und dass die Sperre bei Erreichen des signalisierten Zustands signalisiert bleibt (eine Eigenschaft, die mit vielen Sperren in Verbindung gebracht wird). Leider unterstützen weder Windows noch .NET Framework eine solche Datenstruktur. Zum Glück ist es aber einfach, eine solche zu erstellen.
Um eine Countdownsperre zu erstellen, müssen Sie lediglich den Zähler bei n initialisieren und von jeder untergeordneten Aufgabe atomar um eins verringern lassen, wenn die Aufgabe beendet wird. Dies geschieht zum Beispiel, indem Sie das Heruntersetzen mit einer Sperre oder einem Aufruf von Interlocked.Decrement umgeben. Anstatt dann eine Take-Operation durchzuführen, könnte ein Thread dekrementieren und darauf warten, dass der Zähler Null wird. Wenn er erwacht, wird er feststellen, dass bei der Sperre n Signale registriert wurden. Statt sich in dieser Bedingung zu drehen, wobei (count != 0), ist es in der Regel eine gute Idee, den wartenden Thread blockieren zu lassen. In diesem Fall müssen Sie ein Ereignis verwenden. Abbildung 1 ist ein Beispiel für einen einfachen Typ eines CountdownLatch.
public class CountdownLatch {
    private int m_remain;
    private EventWaitHandle m_event;

    public CountdownLatch(int count) {
        m_remain = count;
        m_event = new ManualResetEvent(false);
    }

    public void Signal() {
        // The last thread to signal also sets the event.
        if (Interlocked.Decrement(ref m_remain) == 0)
            m_event.Set();
    }

    public void Wait() {
        m_event.WaitOne();
    }
}
Dies ist erstaunlich einfach, aber es kann knifflig sein, das richtig hinzubekommen. Es werden einige Beispiele dafür angeführt, wie Sie diese Datenstruktur später verwenden könnten. Beachten Sie, dass die hier gezeigte grundlegende Implementierung Raum für verschiedene Verbesserungen lässt, einschließlich: dem Hinzufügen einer gewissen Wartezeit für den Spin, bevor WaitOne für das Ereignis aufgerufen wird; dem langsamen Zuweisen des Ereignisses, statt diesen Vorgang im Konstruktor durchzuführen (falls das Spinning ausreicht, um das Blockieren vollständig zu vermeiden, wie später in diesem Artikel mit ThinEvent demonstriert wird); dem Hinzufügen von Möglichkeiten zum Zurücksetzen; und das Bereitstellen einer Dispose-Methode, sodass interne Ereignisobjekte geschlossen werden können, wenn sie nicht mehr benötigt werden. Diese Punkte werden dem Leser für praktische Übungen überlassen.

Wiederverwendbare Wartezeit beim Spinning
Obwohl Blockieren im Allgemeinen Busy Waiting vorgezogen wird, gibt es Umstände, unter denen Sie für einige Zeit eher das Spinning einsetzen würden, bevor der echte Wartezustand aktiviert wird. Der Grund für den Nutzen dieser Vorgehensweise ist subtil, und die meisten Personen vermeiden Wartezeiten beim Spinning, da es nach völlig vergeblicher Arbeit aussieht. Wenn ein Kontextwechsel (der jedes Mal auftritt, wenn ein Thread auf ein Kernelereignis wartet) mehrere Tausend Zyklen kostet (dies wird „c“ genannt), so wie es in Windows der Fall ist, und wenn die Bedingung, auf die der Thread wartet, in weniger als 2c Zykluszeit auftritt (1c für das Warten an sich und 1c für das Erwachen), könnte das Spinning den durch das Warten verursachten Aufwand und die Wartezeiten verringern und den Durchsatz und die Skalierbarkeit Ihres Algorithmus erhöhen.
Wenn Sie sich für eine Wartezeit des Spinnings entscheiden, ist Vorsicht angebracht. Es gibt viele Probleme, die auftreten können: Sicherstellen, dass Sie Thread.Spin-Wait innerhalb der Spin-Schleife aufrufen, um die Hardwareverfügbarkeit für andere Hardwarethreads auf Intel-Computern mit Hyperthreading zu verbessern; von Zeit zu Zeit Thread.Sleep mit dem Argument 1 anstatt 0 aufrufen, um eine Umkehrung der Prioritäten zu vermeiden; ein leichtes Backoff verwenden, um eine zufällige Anordnung einzuführen, durch die der Ort verbessert wird (unter der Annahme, dass der Aufrufer den Freigabestatus kontinuierlich wieder liest) und möglicherweise ein Livelock verhindert wird; und natürlich, immer nachgeben auf einem Computer mit einer einzelnen CPU, da das Spinning in solchen Umgebungen komplette Verschwendung ist).
Die Klasse SpinWait ist als Werttyp definiert, so dass sie mit wenig Aufwand zugeordnet werden kann (siehe Abbildung 2). Dieser Algorithmus kann jetzt dazu verwendet werden, das Blockieren im vorher dargestellten Algorithmus CountdownLatch zu vermeiden:
public struct SpinWait {
    private int m_count;
    private static readonly bool s_isSingleProc =
        (Environment.ProcessorCount == 1);
    private const int s_yieldFrequency = 4000;
    private const int s_yieldOneFrequency = 3*s_yieldFrequency;

    public int Spin() {
        int oldCount = m_count;

        // On a single-CPU machine, we ensure our counter is always
        // a multiple of ‘s_yieldFrequency’, so we yield every time.
        // Else, we just increment by one.
        m_count += (s_isSingleProc ? s_yieldFrequency : 1);

        // If not a multiple of ‘s_yieldFrequency’ spin (w/ backoff).
        int countModFrequency = m_count % s_yieldFrequency;
        if (countModFrequency > 0)
            Thread.SpinWait((int)(1 + (countModFrequency * 0.05f)));
        else
            Thread.Sleep(m_count <= s_yieldOneFrequency ? 0 : 1);

        return oldCount;
    }

    private void Yield() {
        Thread.Sleep(m_count < s_yieldOneFrequency ? 0 : 1);
    }
}
private const int s_spinCount = 4000;
public void Wait() {
    SpinWait s = new SpinWait();
    while (m_remain > 0) {
        if (s.Spin() >= s_spinCount) m_event.WaitOne();
    }
}
Die Auswahl von Yield-Frequenz und Spinninganzahl ist zugegebenermaßen willkürlich. Wie die Spinninganzahl für kritische Abschnitte in Win32 sollten diese Zahlen aufgrund von Tests und Experimenten gewählt werden, und die richtigen Zahlen unterscheiden sich vermutlich je nach System. Die MSDN-Dokumentation empfiehlt zum Beispiel eine Spinninganzahl von 4.000 für kritische Abschnitte, die auf Erfahrungen der Teams für das Microsoft Media Center und den Windows-Kernel basiert, aber Ihre eigene Zahl wird vermutlich davon abweichen. Die perfekte Zahl hängt von vielen Faktoren ab, einschließlich der Anzahl der Threads, die zu einer gegebenen Zeit auf ein Ereignis warten, der Häufigkeit, mit der Ereignisse stattfinden, und so weiter. In den meisten Fällen werden Sie die expliziten Yield-Funktionen beseitigen wollen, indem Sie stattdessen auf ein Ereignis warten, wie in dem Beispiel mit der Sperre beschrieben.
Sie können sogar einen Zähler wählen, der sich dynamisch anpasst: Beginnen Sie zum Beispiel mit einer mittleren Anzahl von Spins und erhöhen Sie jedes Mal, wenn das Spinning nicht erfolgreich ist, den Zähler um eins. Wenn der Zähler ein voreingestelltes Maximum erreicht, stoppen Sie das Spinning ganz und geben Sie sofort WaitOne aus. Die Logik ist wie folgt: Sie sind bereit, die vorausbestimmte maximale Anzahl an Zyklen zu nutzen, aber nicht mehr. Wenn Sie befinden, dass dieses Maximum nicht hoch genug ist, um den Kontextwechsel zu verhindern, dann ist es insgesamt betrachtet weniger aufwändig, den Kontextwechsel sofort durchzuführen. Sie können darauf hoffen, dass die Spinninganzahl mit der Zeit einen stabilen Wert erreicht.

Barrieren
Eine Barriere, auch Rendezvouspunkt genannt, ist eine gleichzeitige Stammfunktion, mit der Threads sich untereinander koordinieren können, ohne dass dazu ein weiterer „Master“-Thread nötig ist. Jeder der Threads signalisiert und wartet, atomar, wenn die Barriere erreicht wird. Alle Threads dürfen erst dann fortfahren, wenn alle n die Barriere erreicht haben. Dies kann für zusammenarbeitende Algorithmen verwendet werden, die häufig in wissenschaftlichen, mathematischen und grafischen Bereichen vorkommen. Die Verwendung von Barrieren ist in vielen Berechnungen sinnvoll; tatsächlich sind sie sogar im CLR Garbage Collector enthalten. Barrieren unterbrechen eine größere Berechnung einfach in kleinere, kooperative Phasen, zum Beispiel:
const int P = ...;
Barrier barrier = new Barrier(P);
Data[] partitions = new Data[P];

// Running on ‘P’ separate threads in parallel:
public void Body(int myIndex) {
    FillMyPartition(partitions[myIndex]);
    barrier.Await();
    ReadOtherPartition(partitions[P – myIndex - 1]);
    barrier.Await();
    // ...
}
Sie werden schnell feststellen, dass in dieser Situation auch eine Countdownsperre verwendet werden könnte. Anstatt Await aufzurufen, könnte jeder Thread Signal direkt gefolgt von Wait aufrufen; alle Threads werden freigegeben, sobald alle die Barriere erreicht haben. Es gibt jedoch ein Problem: Die vorher beschriebene Sperre unterstützt die mehrfache Verwendung desselben Objekts nicht, was aber bei einer Barriere eine nützliche Eigenschaft ist, die unterstützt werden sollte. Für das obige Beispiel ist es tatsächlich erforderlich. Sie können dies mit getrennten Barrierenobjekten erreichen, aber das wäre Verschwendung; es ist nicht nötig, mehr als eine Barriere einzusetzen, da alle Threads sich jeweils nur in einer Phase befinden.
Um dieses Problem zu lösen, können Sie mit dem gleichen einfachen Countdownsperrenalgorithmus beginnen, um den Zähler zu herunterzusetzen, das Ereignis zu signalisieren, zu warten usw., und ihn auf die Unterstützung der Wiederverwendung erweitern. Dafür benötigen Sie eine Barriere, die die Wahrnehmung umkehrt, wofür sie zwischen „geraden“ und „ungeraden“ Phasen wechselt. Sie verwenden getrennte Ereignisse für die alternativen Phasen. In Abbildung 3 ist eine Beispielimplementierung einer solchen Barrierendatenstruktur dargestellt.
using System;
using System.Threading;

public class Barrier {
    private volatile int m_count;
    private int m_originalCount;
    private EventWaitHandle m_oddEvent;
    private EventWaitHandle m_evenEvent;
    private volatile bool m_sense = false; // false==even, true==odd.

    public Barrier(int count) {
        m_count = count;
        m_originalCount = count;
        m_oddEvent = new ManualResetEvent(false);
        m_evenEvent = new ManualResetEvent(false);
    }

    public void Await() {
        bool sense = m_sense;

        // The last thread to signal also sets the event.
        if (m_count == 1 || Interlocked.Decrement(ref m_count) == 0) {
            m_count = m_originalCount;
            m_sense = !sense; // Reverse the sense.
            if (sense == true) { // odd
                m_evenEvent.Reset();
                m_oddEvent.Set();
             } else { // even
                m_oddEvent.Reset();
                m_evenEvent.Set();

            }
        } else {
            if (sense == true) m_oddEvent.WaitOne();
            else               m_evenEvent.WaitOne();
        }

    }
}
Der Grund für die Verwendung von zwei Ereignissen ist feinsinnig. Ein Ansatz wäre, in Await ein Set direkt gefolgt von einem Reset durchzuführen, aber das ist gefährlich und führt aus zwei Gründen zu Deadlocks. Erstens könnte ein anderer Thread m_count heruntergesetzt haben, aber noch nicht den Aufruf von WaitOne an das Ereignis erreicht haben, während der Thread in schneller Folge Set und dann Reset aufruft. Zweitens können die in CLR immer verwendeten warnpflichtigen Wartevorgänge die Wartezeit unterbrechen, obwohl der wartende Thread den Aufruf von WaitOne erreicht hat, und damit temporär einen wartenden Thread aus der Warteschlange entfernen, so dass ein asynchroner Prozessaufruf (APC) ausgeführt werden kann. Der wartende Thread wird das Ereignis nie in einem festgelegten Zustand sehen. Beide Fälle führen zu verpassten Ereignissen und wahrscheinlichen Deadlocks. Mit getrennten Ereignissen für gerade und ungerade Phasen vermeiden Sie dies.
Sie könnten der Barriere auch Spinning hinzufügen, wie schon beim CountdownLatch. Dabei werden Sie jedoch auf ein Problem stoßen: Normalerweise würde der Thread das Spinning beibehalten, bis m_count einen Wert von 0 erreicht hat. Bei der obigen Implementierung wird jedoch m_count nie 0 erreichen, bevor der letzte Thread m_count auf m_originalCount zurücksetzt. Dieser naive Ansatz beim Spinning würde dazu führen, dass einer oder mehrere Threads (ewig) das Spinning beibehalten und alle anderen Threads (ewig) vor der nächsten Phase blockiert sind. Die Lösung ist einfach. Sie führen das Spinning durch und warten dabei darauf, dass sich die Wahrnehmung ändert, wie in Abbildung 4 gezeigt.
public void Await() {
    bool sense = m_sense;

    // The last thread to signal also sets the event.
    if (m_count == 1 || Interlocked.Decrement(ref m_count) == 0) {
        m_count = m_originalCount;
        m_sense = !sense; // Reverse the sense.
        if (sense == true) { // odd
            m_evenEvent.Set();
            m_oddEvent.Reset();
        } else { // even
            m_oddEvent.Set();
            m_evenEvent.Reset();
        }
    } else {
        SpinWait s = new SpinWait();
        while (sense == m_sense) {
            if (s.Spin() >= s_spinCount) {
                if (sense == true) m_oddEvent.WaitOne();
                else               m_evenEvent.WaitOne();
            }
        }
    }
}
Weil alle Threads Await aus einer vorigen Phase verlassen müssen, bevor eine nachfolgende Phase ausgeführt werden kann, können Sie sicher sein, dass alle Threads entweder darauf warten, dass die Wahrnehmung sich ändert oder dass sie auf unser Ereignis warten und auf diese Weise erweckt werden.

Blockierte Warteschlange
In Architekturen mit gemeinsamem Speicher liegt der einzige Synchronisierungspunkt zweier oder mehrerer Aufgaben in einer zentralen Datenstruktur mit gemeinsamer Sammlung. Oft sind eine oder mehrere Aufgaben dafür verantwortlich, „Arbeit“ zu generieren, die von einer oder mehreren anderen Aufgaben verbraucht wird, was als Produzenten-/Verbraucherbeziehung bezeichnet wird. Eine einfache Synchronisierung für solche Datenstrukturen ist normalerweise unkompliziert, mit Monitor oder ReaderWriterLock ist sie schnell erledigt. Aber die Koordination der Aufgaben untereinander wird schwieriger, wenn der Puffer sich leert. Dieses Problem wird normalerweise mit einer blockierten Warteschlange gelöst.
Tatsächlich gibt es mehrere leichte Varianten von blockierten Warteschlangen, von einer einfachen Variante, in der der Verbraucher nur blockiert, wenn die Warteschlange leer ist, bis zu komplizierteren Strukturen, in denen jeder Produzent mit genau einem Verbraucher gepaart ist, also der Produzent geblockt wird, bis ein Verbraucher das eingereihte Element verarbeitet und, ganz ähnlich, der Verbraucher blockiert ist, bis ein Produzent ein Element übergibt. Eine FIFO (First-In-First-Out)-Bestellung kommt häufig vor, ist aber nicht immer notwendig. Puffer können ebenfalls begrenzt werden, wie wir später sehen werden. Wir betrachten hier nur die Verpaarungsvariante, da der begrenzte Puffer, den wir später besprechen, das einfachere Blockieren-wenn-leer-Verhalten beinhaltet.
Um dies zu implementieren, wrappen Sie eine einfache Warteschlange<T> mit zusätzlicher Synchronisierung. Welche Art von Synchronisierung? Wenn ein Thread ein Element einreiht, wartet er auf einen Verbraucher, der das Element entfernt, bevor er zurückkehrt. Wenn ein Thread ein Element entfernt und der Puffer dadurch geleert wird, muss der Thread auf ein neu eingehendes Element warten. Und selbstverständlich muss der Verbraucher nach dem Entfernen des Elements dem Produzenten mitteilen, dass er das Element genommen hat (siehe Abbildung 5).
class Cell<T> {
    internal T m_obj;
    internal Cell(T obj) { m_obj = obj; }
}
 
public class BlockingQueue<T> {
    private Queue<Cell<T>> m_queue = new Queue<Cell<T>>();
    public void Enqueue(T obj) {
        Cell<T> c = new Cell<T>(obj);
        lock (m_queue) {
            m_queue.Enqueue(c);
            Monitor.Pulse(m_queue);
            Monitor.Wait(m_queue);
        }
    }
    public T Dequeue() {
        Cell<T> c;
        lock (m_queue) {
            while (m_queue.Count == 0)
                Monitor.Wait(m_queue);
            c = m_queue.Dequeue();
            Monitor.Pulse(m_queue);
        }
        return c.m_obj;
    }
}
Beachten Sie, dass wir zunächst Pulse und dann Wait bei der Methode Enqueue aufrufen und danach, bei Dequeue, erst Wait und dann Pulse. Aufgrund der Art, wie Monitore implementiert sind (interne Ereignisse werden festgelegt, bevor der Monitor freigegeben wird), kann es zu einem Ping-Pong bei der Threadplanung kommen. Wir könnten die Möglichkeit in Betracht ziehen, stattdessen einen detaillierteren Benachrichtigungsmechanismus zu erstellen, der vielleicht Win32-Ereignisse verwendet. Umfangreiche Win32-Ereignisse können zu diesem Zweck jedoch einen nicht unerheblichen Aufwand verursachen, vor allem bei den Zuordnungskosten und beim Einsatz von Kerneltransitionen, also ist es an der Zeit für Alternativen. Sie könnten sie zusammenlegen, wie der ReaderWriterLock der CLR, oder sie einem Lazy Wait zuordnen, wie es der Typ ThinEvent vornimmt (zeigen wir später). Diese Implementierung hat auch den Nachteil, jedem neuen Element ein Objekt zuzuordnen; ein alternativer Ansatz könnte sein, diese Objekte ebenso zusammenzulegen, aber auch hier entsteht zusätzliche Komplexität.

Begrenzter Puffer
In einigen Arten von Warteschlangen kann ein Problem beim Ressourcenverbrauch auftreten. Wenn die Aufgabe(n) des Produzenten Elemente schneller erstellen, als diese vom Verbraucher verarbeitet werden können, kann das System in einen Zustand unbegrenzter Speichernutzung geraten.
Stellen Sie sich zum Beispiel vor, in einem System reiht ein einziger Produzent 50 Elemente pro Sekunde ein, und der Verbraucher verarbeitet nur 10 Elemente pro Sekunde. Zunächst ist das System nicht ausbalanciert und ist mit einer eins-zu-eins Produzent-zu-Verbraucher-Konfiguration nicht gut skaliert. Nach nur einer Minute türmen sich 2.400 Elemente im Puffer auf. Wenn diese Elemente zum Beispiel jeweils 10 KB nutzen, werden dadurch 24 MB des Speichers nur für den Puffer belegt. Nach einer Stunde wird dies auf über 1 GB angewachsen sein. Die Anzahl von Produzententhreads in ein Verhältnis zur Anzahl zu Verbraucherthreads zu setzen, ist eine Lösung; in diesem Fall wäre es ein Produzent zu fünf Verbrauchern. Aber Ankunftraten schwanken oft, verursachen dadurch zeitweise Unausgewogenheiten und führen zu dramatischen Problemen; ein einfaches, festes Verhältnis wird dieses Problem nicht lösen.
Auf einem Server, auf dem Programme oft lange andauern und gute Betriebszeiten erwartet werden, kann die Möglichkeit zu unbegrenzter Speichernutzung zu totalem Chaos führen, welches möglicherweise zu einer Situation führt, in der der Serverprozess regelmäßig wiederverwendet werden muss.
Mit einem begrenzten Puffer können Sie eine Größe angeben, die der Puffer erreichen darf, bevor der Produzent gezwungen wird, zu blockieren. Das Blockieren des Produzenten gibt dem Verbraucher die Chance, ihn „einzuholen“ (indem dem Verbraucherthread erlaubt wird, ein Planungszeitintervall zu empfangen), während gleichzeitig das Problem der wachsenden Speichernutzung beseitigt wird. Unser Ansatz ist, einfach eine Warteschlange<T> zu wrappen und zwei Wait-Bedingungen und zwei Ereignisbenachrichtigungsbedingungen hinzuzufügen: Ein Produzent wartet, wenn die Warteschlange voll ist (bis sie nicht mehr voll ist), und ein Verbraucher wartet, wenn die Warteschlange leer ist (bis sie nicht mehr leer ist); ein Produzent signalisiert wartenden Verbrauchern, dass er ein Element erstellt hat, und ein Verbraucher signalisiert einem Produzenten, wenn er ein Element akquiriert hat, wie in Abbildung 6 gezeigt.
public class BoundedBuffer<T> {
    private Queue<T> m_queue = new Queue<T>();
    private int m_consumersWaiting;
    private int m_producersWaiting;
    private const int s_maxBufferSize = 128;

    public void Enqueue(T obj) {
        lock (m_queue) {
            while (m_queue.Count == (s_maxBufferSize - 1)) {
                m_producersWaiting++;
                Monitor.Wait(m_queue);
                m_producersWaiting--;
            }
            m_queue.Enqueue(obj);
            if (m_consumersWaiting > 0)
                Monitor.PulseAll(m_queue);
        }
    }

    public T Dequeue() {
        T e;
        lock (m_queue) {
            while (m_queue.Count == 0) {
                m_consumersWaiting++;
                Monitor.Wait(m_queue);
                m_consumersWaiting--;
            }
            e = m_queue.Dequeue();
            if (m_producersWaiting > 0)
                Monitor.PulseAll(m_queue);
        }
        return e;
    }
}
Auch dieser Ansatz ist ein wenig naiv. Wir optimieren jedoch die Aufrufe von PulseAll, da sie nicht gerade günstig sind, indem wir zwei Zähler einsetzen, m_consumersWaiting und m_producersWaiting, und nur signalisieren, wenn der jeweilige Wert nicht Null ist. Es gibt weitere Möglichkeiten zur Verbesserung. Ein einzelnes Ereignis wie dieses freizugeben, könnte zum Beispiel zu viele Threads erwecken: Wenn ein Verbraucher die Größe der Warteschlange auf 0 verringert, und es sowohl wartende Produzenten als auch wartende Verbraucher gibt, wollen Sie natürlich (zunächst) nur die Produzenten erwecken. Diese Implementierung bedient alle Wartenden in FIFO-Reihenfolge. Das bedeutet, dass es nötig sein könnte, vor der Ausführung von Produzenten zuerst die Verbraucher zu erwecken, nur damit diese herausfinden, dass die Warteschlange leer ist und folglich wieder warten. Zum Glück ist es recht selten, dass Produzenten und Verbraucher gleichzeitig warten, aber es kann bei engen Grenzen regelmäßig vorkommen.

Thin Event
Win32-Ereignisse haben einen großen Vorteil gegenüber Monitor.Wait, Pulse und PulseAll: Sie „bleiben kleben“. Das bedeutet, sobald ein Ereignis signalisiert wurde, wird jeder nachfolgende Wartevorgang sofort freigegeben, auch wenn die Threads erst nach dem Signal in den Wartezustand übergegangen sind. Ohne dieses Feature müssen Sie oft entweder ineffizienten Code schreiben, in dem alle Wartevorgänge und Signale streng innerhalb eines kritischen Bereichs geschehen – das ist ineffizient, da der Windows-Planer die Priorität eines erweckten Threads immer erhöht, wodurch jedes Mal ein Kontextwechsel für den erwachenden Thread auftritt, nur damit dieser sofort wieder auf den kritischen Bereich warten muss – oder Sie müssen schwierigen Code verwenden, der nahezu alle Fälle abdeckt.
Anstelle dieser beiden Ansätze können Sie ein „Thin Event“ verwenden, eine wiederverwendbare Datenstruktur, die kurz ein Spinning durchführt, bevor sie blockiert, ein Win32-Ereignis nur wenn nötig einem Lazy Wait zuweist, und Ihnen sonst ein ereignisähnliches, manuell rücksetzbares Verhalten bietet. Anders ausgedrückt enthält ein Thin Event den schwierigen Code, der alle Fälle abdeckt, sodass Sie ihn nicht in Ihrer gesamten Codebasis verstreuen müssen. Dieses Beispiel basiert auf einigen Garantien des Speichermodells, die im Artikel von Vance Morrison beschrieben werden, und sollte mit äußerster Sorgfalt verwendet werden (siehe Abbildung 7).
public struct ThinEvent {
    private int m_state; // 0 means unset, 1 means set.
    private EventWaitHandle m_eventObj;
    private const int s_spinCount = 4000;

    public void Set() {
        m_state = 1;
        Thread.MemoryBarrier(); // required.
        if (m_eventObj != null)
            m_eventObj.Set();
    }

    public void Reset() {
        m_state = 0;
        if (m_eventObj != null)
            m_eventObj.Reset();
    }

    public void Wait() {
        SpinWait s = new SpinWait();
        while (m_state == 0) {
            if (s.Spin() >= s_spinCount) {
                if (m_eventObj == null) {
                    ManualResetEvent newEvent =
                        new ManualResetEvent(m_state == 1);
                    if (Interlocked.CompareExchange<EventWaitHandle>(
                            ref m_eventObj, newEvent, null) == null) {
                       // If someone set the flag before seeing the new
                       // event obj, we must ensure it’s been set.
                       if (m_state == 1)
                           m_eventObj.Set();
                    } else {
                        // Lost the race w/ another thread. Just use
                        // its event.
                        newEvent.Close();
                    }
                }
                m_eventObj.WaitOne();
            }
        }
    }
}
Dies spiegelt im Grunde den Ereigniszustand einer m_state-Variable wider, in dem der Wert 0 nicht festgelegt und der Wert 1 festgelegt bedeutet. Auf ein festgelegtes Ereignis zu warten ist jetzt wirklich günstig; wenn m_state im Eingang zur Warteroutine 1 ist, kehren wir sofort zurück, und es wird kein Kernelübergang nötig. Die Schwierigkeit entsteht dann, wenn ein Thread wartet, bevor das Ereignis festgelegt wurde. Der erste wartende Thread muss ein neues Ereignisobjekt zuweisen und es mit dem Feld m_eventObj vergleichen und ggf. tauschen; wenn der CAS fehlschlägt, also ein anderer Wartender das Ereignis initialisiert hat, kann es einfach wiederverwendet werden; andernfalls muss erneut geprüft werden, ob m_state sich geändert hat, seitdem es zuletzt geprüft wurde. Ansonsten könnte m_state 1 werden und m_eventObj nicht signalisiert werden, was zu einem Deadlock beim Aufruf von WaitOne führen würde. Der Thread, der Set aufruft, muss zuerst m_state festlegen und dann, wenn er ein m_eventObj ungleich Null beobachtet, Set darauf aufrufen. Es sind zwei Speicherbarrieren erforderlich: Das zweite Lesen von m_state darf nicht nach vorne verschoben werden, was durch die Verwendung von Interlocked.CompareExchange zum Festlegen von m_eventObj sichergestellt wird; und das Lesen von m_eventObj in Set darf nicht vor das Schreiben in m_eventObj verschoben werden (eine überraschenderweise auf einigen Intel- und AMD-Prozessoren sowie im CLR 2.0-Speichermodell erlaubte Transformation, ohne den ausdrücklichen Aufruf von Thread.MemoryBarrier). Das Zurücksetzen des Ereignisses ist normalerweise bei Parallelität nicht sicher, so dass eine zusätzliche Synchronisierung durch den Aufrufer notwendig ist.
Sie können dies nun einfach anderweitig einsetzen, zum Beispiel in den obigen Beispielen zu CountdownLatch und zur Warteschlange, normalerweise mit einem deutlichen Leistungszuwachs, besonders, wenn Sie das Spinning intelligent verwenden.
Oben wurde eine schwierige Implementierung dargestellt. Beachten Sie, dass Sie sowohl automatisch als auch manuell rücksetzbare Typen mit einem Single Flag und Monitoren viel einfacher als in diesem Beispiel implementieren können (aber nicht immer so effizient).

LIFO-Stapel ohne Sperre
Das Erstellen einer threadsicheren Sammlung mit Sperren ist recht einfach, auch wenn Grenzen gesetzt und komplizierte Vorgänge blockiert werden (wie wir oben gesehen haben). Wenn jedoch die gesamte Koordination über eine einfache Last-In-First-Out-Stapeldatenstruktur (LIFO) stattfindet, könnte es weit aufwändiger sein als nötig, eine Sperre zu verwenden. Der kritische Bereich eines Threads, die Zeit, während eine Sperre gehalten wird, hat einen Anfang und ein Ende; dazwischen liegt Zeit für viele Anweisungen. Das Halten der Sperre hindert andere Threads daran, gleichzeitig zu schreiben und zu lesen. Dadurch wird eine Serialisierung erreicht, die wir selbstverständlich wollen, aber sie ist viel stärker als nötig: Elemente werden einem Stapel hinzugefügt und wieder davon entfernt. Beides kann auch mit normalem Lesen und einem einzigen Schreiben durch Vergleichen-und-Tauschen erreicht werden. Diese Tatsache kann ausgenutzt werden, um einen skalierbareren Stapel ohne Sperre zu erstellen, der Threads nicht zu unnötigem Warten zwingt.
Der Algorithmus funktioniert folgendermaßen. Sie verwenden eine verlinkte Liste, um den Stapel zu repräsentieren; ihr Kopf ist die Spitze des Stapels, der im Feld m_head gespeichert ist. Beim Übertragen eines neuen Elements auf den Stapel erstellen Sie einen neuen Knoten mit dem Wert, der auf den Stapel übertragen wird, lesen das Feld m_head lokal, speichern es in das Feld m_next des neuen Knotens und führen dann einen atomaren Interlocked.CompareExchange durch, um den derzeitigen Kopf des Stapels zu ersetzen. Wenn der Kopf an irgendeinem Punkt in dieser Sequenz geändert wird (seit er das erste Mal gelesen wurde), schlägt CompareExchange fehl, und der Thread muss eine Schleife durchlaufen und die ganze Sequenz erneut versuchen. Entfernen ist ähnlich einfach. Sie lesen m_head und versuchen, es durch den Verweis m_next unserer lokalen Version zu ersetzen. Schlägt dies fehl, versuchen Sie es einfach weiter, wie in Abbildung 8 dargestellt. Win32 bietet eine analoge Datenstruktur namens SList, die mit einem ähnlichen Algorithmus erstellt wurde.
public class LockFreeStack<T> {
    private volatile StackNode<T> m_head;

    public void Push(T item) {
        StackNode<T> node = new StackNode<T>(item);
        StackNode<T> head;
        do {
            head = m_head;
            node.m_next = head;
        } while (m_head != head || Interlocked.CompareExchange(
                ref m_head, node, head) != head);
    }

    public T Pop() {
        StackNode<T> head;
        SpinWait s = new SpinWait();

        while (true) {
            StackNode<T> next;
            do {
                head = m_head;
                if (head == null) goto emptySpin;
                next = head.m_next;
            } while (m_head != head || Interlocked.CompareExchange(
                    ref m_head, next, head) != head);
            break;

        emptySpin:
            s.Spin();
        }

        return head.m_value;
    }
}

class StackNode<T> {
    internal T m_value;
    internal StackNode<T> m_next;
    internal StackNode(T val) { m_value = val; }
}

Beachten Sie, dass dies eine Form von optimistischer Gleichzeitigkeitssteuerung ist: Statt anderen Threads den Datenzugriff zu blockieren, fahren Sie einfach in der Hoffnung fort, das Rennen zu „gewinnen“. Wenn sich diese Hoffnung als falsch erweist, können Liveness-Probleme wie Livelocks auftreten. Diese Entwurfsentscheidung impliziert auch, dass Sie keine zuverlässige FIFO-Planung erreichen können. Alle Threads im System werden sich wahrscheinlich in Vorwärtsrichtung bewegen. Und eigentlich bewegt sich unser System als Ganzes deterministisch in Vorwärtsrichtung, weil ein fehlgeschlagener Thread immer bedeutet, dass mindestens ein anderer Thread Fortschritte gemacht hat (eine Voraussetzung für die Definition von Sperrenfreiheit). Manchmal ist es nützlich, ein exponentielles Backoff zu verwenden, wenn ein CompareExchange versagt, um massive Speicherkonflikte auf m_head zu vermeiden.
Auch sind wir im Fall eines leeren Stapels einem recht naiven Ansatz gefolgt. Das Spinning dauert einfach ewig, während auf ein neu übertragenes Element gewartet wird. Es ist einfach, Pop in eine nicht wartende Methode TryPop umzuschreiben, und etwas schwieriger, Ereignisse zum Warten zu nutzen. Beides sind wichtige Features und dienen als Übung für den motivierten Leser.
Wir erhalten eine Objektzuordnung bei jeder Übertragung, was uns die Sorge um sogenannte ABA-Probleme erspart. ABA tritt auf, wenn Knoten, die aus der Liste entfernt wurden, intern wiederverwendet werden. Entwickler versuchen manchmal, Knoten zusammenzulegen, um die Anzahl von Objektzuordnungen zu verringern, aber dies ist problematisch: Es kann vorkommen, dass ein atomarer Vorgang fälschlicherweise gelingt, obwohl es einige intervenierende Schreibvorgänge an m_head gegeben hat. (Beispiel: Knoten A wird von Thread 1 gelesen, dann von Thread 2 entfernt und in den Pool gestellt; B wird von Thread 2 als neuer Kopf übertragen, dann kehrt A vom Pool zu zurück Thread 2 und wird übertragen; Thread 1 führt dann erfolgreich einen CompareExchange durch, obwohl A am Kopf sich nun von dem A, das von Thread 1 gelesen wurde, unterscheidet.) Ein ähnliches Problem tritt auf, wenn Sie versuchen, diesen Algorithmus in systemeigenem C/C++ zu schreiben. Da die Speicherzuordnung Adressen wiederverwenden kann, sobald sie freigegeben sind, kann ein Knoten entfernt und freigegeben werden, und seine Adresse kann anschließend einer neuen Knotenzuordnung ausgegeben werden, wodurch das gleiche Problem verursacht wird. ABA wird hier nicht weiter erörtert, dies ist bereits an anderer Stelle ausführlich geschehen.
Zu guter Letzt ist es möglich, eine FIFO-Warteschlange mit ähnlichen, sperrenfreien Verfahren zu schreiben. Dies ist interessant, da gleichzeitig übertragende und entfernende Threads nicht notwendigerweise in Konflikt miteinander geraten, im Gegensatz zum oben beschriebenen LockFreeStack (sperrenfreien Stapel), in dem die übertragenden und entfernenden Threads um dasselbe Feld m_head kämpfen. Der Algorithmus ist recht kompliziert, doch wenn Sie neugierig sind, ermutige ich Sie, sich den Artikel „Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms“ von Maged M. Michael und Michael L. Scott von 1996 anzusehen.

Aufspalten von Schleifen
Bei der Aufspaltung von Schleifen werden der Eingabebereich bzw. die Daten für eine Schleife partitioniert, und dann wird jede Partition einem getrennten Thread zugewiesen, um Gleichzeitigkeit zu erreichen. Es ist das wichtigste Verfahren zum Erreichen von Parallelität bei einigen Programmiermodellen wie OpenMP (siehe -Artikel http://msdn.microsoft.com/msdnmag/issues/05/10/OpenMP von Kang Su Gatlin) und wird in Anlehnung an die Sprache des hoch leistungsfähigen FORTRAN oft als parallele Forall-Schleife bezeichnet. Gleichgültig, ob der Bereich nur aus einem Satz Indizes besteht:
for (int i = 0; i < c; i++) { ... } 
oder aus einer Reihe Daten:
foreach (T e in list) { ... }
es können Partitionierungstechniken entwickelt werden, die eine aufgespaltete Schleife erstellen.
Es gibt viele spezifische Partitionierungstechniken für bestimmte Datenstrukturen, die Sie hier anwenden können; eindeutig zu viele, um alle in diesem Artikel aufzuführen. Wir konzentrieren uns daher auf ein verbreitetes Verfahren, bei der jeder Partition unabhängige Elementbereiche aus dem Array zugewiesen werden. Es wird einfach ein Schritt berechnet, der sich ungefähr ergibt, wenn die Anzahl Elemente durch die Anzahl Partitionen geteilt wird. Dieses Verhältnis wird dann zur Berechnung zusammenhängender Bereichen verwendet (siehe Abbildung 9). Dies ergibt eine gute räumliche Lokalität, wenn die Eingabe ein Array von Werttypen ist, obwohl andere Ansätze sicher ebenso gültig, nützlich und manchmal notwendig sind.
public static void ForAll(int from, int to, Action<int> a, int p) {
    ForAll<int>(null, from, to, null, a, p);
}
public static void ForAll<T>(IList<T> data, Action<T> a, int p) {
    ForAll<T>(data, 0, data.Count, a, null, p);
}

private static void ForAll<T>(IList<T> data, int from, int to,
        Action<T> a0, Action<int> a1, int p) {
    int size = from - to;
    int stride = (size + p - 1) / p;
    CountdownLatch latch = new CountdownLatch(p);

    for (int i = 0; i < p; i++) {
        int idx = i;
        ThreadPool.QueueUserWorkItem(delegate {
            int end = Math.Min(size, stride * (idx + 1));
            for (int j = stride * idx; j < end; j++) {
                if (data != null) a0(data[j]);
                else              a1(j);
            }

            latch.Signal();
        });
    }

    latch.Wait();
}
Wir bieten hier zwei öffentliche Versionen von ForAll an: Eine, die einen Bereich von Zahlen annimmt, und eine weitere, die eine IList<T> annimmt, genau wie eine foreach-Schleife in C#. Beide leiten an dasselbe Helferoverload weiter, das entweder eine Aktion initiiert, die das Element von der Liste beim gegebenen Index übergibt oder den Index selbst übergibt. Sie könnten das erste Overload verwenden, wo Sie üblicherweise eine normale For-Schleife einsetzen würden. Aus diesem Code
for (int i = 0; i < 10; i++) { S; }
wird zum Beispiel:
Parallel.ForAll(0, 10, delegate(int i){ S; },
    Environment.ProcessorCount);
Und Sie können das zweite verwenden, wo Sie normalerweise eine C#-foreach-Schleife einsetzen würden, so dass aus
List<T> list = ...;
foreach (T e in list) { S; }
zum Beispiel das hier wird:
Parallel.ForAll(list, delegate(T e) { S; }, 
    Environment.ProcessorCount);
Sie müssen verhindern, dass Anweisungen in S an freigegebenen Speicher geschrieben werden, denn sonst müssen Sie eine korrekte Synchronisierung für die parallelen Versionen hinzufügen. Es können selbstverständlich Versionen geschrieben werden, die jede IEnumerable<T> berücksichtigen, den Iterationsbereich anders partitionieren und so weiter (dies alles wurde hier aus Platzgründen ausgelassen). Und in diesem Beispiel wird der aufrufende Thread für die Dauer der n Unteraufgaben „verschwendet“. Ein besserer Ansatz wäre, den aufrufenden Thread zu verwenden, um eine der Aufgaben selber auszuführen, und sich bei Vollendung den anderen anzuschließen. Das Ausdehnen der ForAll Methode darauf ist banal.

Parallele Reduktionen
Es gibt eine Kategorie von Vorgängen, die mit einer Reduktion durchgeführt werden können (auch als Fold oder Aggregation bekannt), und in der viele Werte so kombiniert werden, dass eine einzige Ausgabe entsteht. Eine allgemeine Reduktion funktioniert folgendermaßen. Sie nehmen einen binären Operator – also eine Funktion mit zwei Argumenten – und berechnen sie über einen Vektor oder einen Satz Elemente der Größen n, von links nach rechts. Für j = 0 bis n - 1 rufen Sie den binären Operator auf und übergeben als Eingabe zur j-ten Iteration die Ausgabe der Initiierung des Operators auf Element j - 1 als erstes Argument und das j-te Element selbst als zweites Argument. Ein besonderer Ausgangswert wird für das erste Argument bis zum nullten Element verwendet, da es keinen vorherigen Wert gibt, der verwendet werden kann. Danach wird mit einer letzten, optionalen Ergebnisauswahl ein Zwischenwert in ein Endergebnis umgewandelt.
Hier ein Beispiel dafür. Wenn der binäre Operator + ist und die Eingabe aus einem Vektor von 5 Elementen besteht {1, 2, 3, 4, 5}, dann sieht die erweiterte Berechnung wie folgt aus: (((( 1 + 2) + 3) + 4) + 5). Wenn Sie diese Ausweitung in ein Formular konvertieren, das eine Funktion aufruft, sieht sie so aus (wenn 0 als Ausgangswert angenommen wird): +(+(+(+(+(0, 1), 2), 3), 4), 5) Anders ausgedrückt, berechnen Sie einfach die Summe aller Zahlen der Eingabe. Das wird Summenreduktion genannt. Eine einfache Übersetzung dieses verallgemeinerten Algorithmus in einen seriellen Algorithmus könnte so aussehen:
delegate T Func<T>(T arg0, T arg1);

T Reduce<T>(T[] input, T seed, Func<T> r) {
    T result = seed;
    foreach (T e in input)
        result = r(result, e);
    return result;
}
Wenn Sie ihn aufrufen, können Sie nun einfach (in C# 3.0) einen Satz Zahlen so aufsummieren.
int[] nums = ... some set of numbers ...;
int sum = Reduce(nums, 0, (x,y) => x + y;);
All dies ist recht abstrakt, doch zusätzlich zu einer Summe können noch viele weitere Operationen mit einer Reduktion ausgedrückt werden, wie Sie in Abbildung 10 sehen können.

  Ausgangswert Binärer Operator Ereignisauswahl
Count 0 (a, b) => a + 1 Nicht zutreffend
Summe 0 (a, b) => a + b Nicht zutreffend
Min. NaN (a, b) => a < b ? a : b Nicht zutreffend
Max. NaN (a, b) => a > b ? a : b Nicht zutreffend
Durchschnitt { 0, 0 } (a, b) => neu { a[0] + b, a[1] + 1 } (a) => a[0] / a[1]
Wird mit dem Zahlenarray von oben weitergearbeitet, kann Reduktionsroutine dazu genutzt werden, das Minimum und das Maximum des Arrays zu finden:
int min   = Reduce(nums, int.MaxValue, (x,y) => x < y ? x : y;);
int max   = Reduce(nums, int.MinValue, (x,y) => x > y ? x : y;);
(Count wird ausgelassen, da die Teilergebnisse summiert werden müssen, wofür zwei getrennte Binäroperatoren erforderlich sind; und Average wird ausgelassen, da dafür zusätzliche Schritte nötig sind.)
Um die Eingabedaten aufzuteilen und die Reduktion parallel durchzuführen, können Sie ein ähnliches Verfahren verwenden wie das, das für die Aufspaltung einer Schleife erörtert wurde. Jede Partition berechnet ihren eigenen Zwischenwert. Die Zwischenwerte werden dann mithilfe desselben Operators der für die Zwischenwerte genutzt wurde, zu einem einzigen Endwert kombiniert. Weshalb ist das möglich? Weil alle oben genannten Vorgänge assoziativ sind. Erinnern Sie sich an Grundschulmathematik: ein assoziativer binärer Operator + bedeutet, dass (a + b) + c = a + (b + c), dass also die Reihenfolge der Rechenvorgänge für das Ergebnis unerheblich ist.
Denken Sie zum Beispiel an die Summenreduktion. Wenn Sie die Eingabedaten {1, 2, 3, 4} in zwei Partitionen partitionieren, {1, 2} und {3, 4}, dann ist das Ergebnis der Addition der jeweiligen unabhängigen Summen dasselbe, da + assoziativ ist. (1 + 2) + (3 + 4) = ((((1 + 2) + 3) + 4). Tatsächlich ergibt jede Partitionierung der Eingabe in unabhängige Einheiten korrekte Ergebnisse. Abbildung 11 zeigt eine verallgemeinerte Reduktionsmethode, die den Ausgangswert und den Binäroperator als Argumente verwendet, und dann eine Schrittpartitionierung durchführt wie oben beschrieben.
public delegate T Func<T>(T arg0, T arg1);

public static T Reduce<T>(IList<T> data, int p, T seed, Func<T> r){
    T[] partial = new T[p];
    int stride = (data.Count + p - 1) / p;
      CountdownLatch latch = new CountdownLatch(p);

    for (int i = 0; i < p; i++) {
        int idx = i;
          ThreadPool.QueueUserWorkItem(delegate {
            // Do the ‘ith’ intermediate reduction in parallel.
            partial[idx] = seed;
            int end = Math.Min(data.Count, stride * (idx + 1));
            for (int j = stride * idx; j < end; j++)
                partial[idx] = r(partial[idx], data[j]);
            latch.Signal();
        });
    }

    latch.Wait();

    // Do the final reduction on the master thread.
     T final = seed;
    for (int i = 0; i < p; i++)
        final = r(final, partial[i]);

    return final;
}
Der Masterthread zweigt einen Satz von p Workerthreads ab, von denen jeder ein Zwischenergebnis berechnet, indem er eine Reduktion über seine eigene Datenpartition durchführt und den Wert am zugehörigen Platz im Array für Zwischenwerte ablegt. Der Masterthread wartet dann auf die Beendigung aller untergeordneten Elemente (mit dem am Anfang beschriebenen CountdownLatch) und führt einen letzten Reduktionsschritt durch, in dem die Teilergebnisse jedes untergeordneten Elements als Eingabe verwendet werden. In der Literatur sind Strukturreduktionen recht verbreitet. Dabei wird in jedem Knoten der Struktur eine partielle Reduktion über eine bestimmte Anzahl von Zwischenergebnissen durchgeführt. Während dies theoretisch zu einem skalierbareren Algorithmus führt, ist es in Wirklichkeit so, dass durch die Anzahl der Threads, mit der Sie wahrscheinlich arbeiten und den entstehenden Synchronisierungskosten auf Windows die gezeigte serielle Reduktion für viele Werte von p und Binäroperatoren eine bessere Leistung erzielt.
Es gibt hier noch viele Verbesserungsmöglichkeiten – zum Beispiel, den aufrufenden Thread für eine der Partitionen wieder zu verwenden – aber dennoch sollte das Beispiel den Gesichtspunkt ganz gut illustrieren. Um Vorgänge wie Average zu unterstützen, bei denen die Reduktion der Zwischenstufe sich von der Reduktion der Endstufe unterscheidet, und für die eine Auswahlroutine für das Endergebnis erforderlich ist, brauchen wir eine leicht abgewandelte API. Dies ist eine recht einfache Übung.

Schlussbemerkung
In diesem Artikel wurden einige untergeordnete parallele Datenstrukturen und Algorithmen vorgestellt, mit denen Sie verwalteten Code schreiben können, der Multiprozessoren und Mehrfachkernarchitekturen nutzt. Wie beim Programmieren neigen Abstraktionen dazu, sich in Schichten zusammenzuklumpen, wobei das für die Leistung wichtigste Element ganz unten bleibt. Wir können sicher sagen, dass viele der Verfahren in diesem Artikel ganz unten stehen und als Basis für Abstraktionen auf höherer Ebene und anwendungsspezifischen parallelen Code dienen. Obwohl die Auswahl der richtigen Datenstrukturen und der grundlegenden Algorithmen nur ein Schritt in einem längeren Prozess ist, hoffe ich, dass Sie ein tieferes Verständnis von parallelen Programmierverfahren gewonnen haben, das für Ihren Erfolg in diesem neuen Bereich der Programmierung grundlegend ist.

Senden Sie Ihre Fragen und Kommentare an clrinout@microsoft.com.


Joe Duffy arbeitet bei Microsoft an parallelen Programmiermodellen und paralleler Infrastruktur und schreibt regelmäßig in seinem Blog unter www.bluebytesoftware.com/Blog. Einige der Codebeispiele dieser Kolumne basieren auf seinem demnächst erscheinenden Buch Concurrent Programming on Windows, das 2007 bei Addison Wesley veröffentlicht wird.

Page view tracker