MSDN Magazin > Home > Ausgaben > 2008 > February >  .NET-Themen: Datenstrompipeline
.NET-Themen
Datenstrompipeline
Stephen Toub


F: In meiner Anwendung verschlüssele und komprimiere ich ziemlich viele Daten. Da dies rechenintensive Vorgänge sind, erwartete ich, 100 Prozent CPU-Auslastung im Task-Manager zu sehen, aber ich habe bemerkt, dass auf meinem Dual-Core-Computer nur etwa 50 Prozent erreicht werden. Dies liegt vermutlich daran, dass nur ein Kern verwendet wird, was bedauerlich ist, da der Prozess erhebliche Zeit in Anspruch nimmt. Gibt es eine Möglichkeit, für diesen Verschlüsselungs- und Komprimierungsprozess beide Prozessoren zu verwenden? Ich verwende CryptoStream und GZipStream von Microsoft® .NET Framework.
F: In meiner Anwendung verschlüssele und komprimiere ich ziemlich viele Daten. Da dies rechenintensive Vorgänge sind, erwartete ich, 100 Prozent CPU-Auslastung im Task-Manager zu sehen, aber ich habe bemerkt, dass auf meinem Dual-Core-Computer nur etwa 50 Prozent erreicht werden. Dies liegt vermutlich daran, dass nur ein Kern verwendet wird, was bedauerlich ist, da der Prozess erhebliche Zeit in Anspruch nimmt. Gibt es eine Möglichkeit, für diesen Verschlüsselungs- und Komprimierungsprozess beide Prozessoren zu verwenden? Ich verwende CryptoStream und GZipStream von Microsoft® .NET Framework.
A: Nur um sicherzugehen: Führen Sie den Komprimierungsvorgang vor dem Verschlüsselungsvorgang aus? Wenn nicht, sollten Sie dies möglichst ändern. Gute Verschlüsselung generiert relativ schlecht komprimierbare Daten. Wenn Sie die Reihenfolge der Vorgänge ändern, um erst zu komprimieren und dann zu verschlüsseln, dürften Sie nicht nur eine kleinere Datei erhalten, sondern die Verschlüsselung wird wahrscheinlich auch weniger Zeit erfordern, da weniger Daten betroffen sind. Als Test habe ich den Text von „Krieg und Frieden“ über das Gutenberg Project (www.gutenberg.org) heruntergeladen und beide Reihenfolgen ausprobiert. Das Verschlüsseln (mit RijndaelManaged und der Standardschlüsselgröße) und anschließende Komprimieren führte zu einem Datenstrom, der um 250 Prozent größer war als der durch anfängliches Komprimieren und anschließendes Verschlüsseln generierte. Zudem nahm der Prozess 50 Prozent mehr Zeit in Anspruch.
A: Nur um sicherzugehen: Führen Sie den Komprimierungsvorgang vor dem Verschlüsselungsvorgang aus? Wenn nicht, sollten Sie dies möglichst ändern. Gute Verschlüsselung generiert relativ schlecht komprimierbare Daten. Wenn Sie die Reihenfolge der Vorgänge ändern, um erst zu komprimieren und dann zu verschlüsseln, dürften Sie nicht nur eine kleinere Datei erhalten, sondern die Verschlüsselung wird wahrscheinlich auch weniger Zeit erfordern, da weniger Daten betroffen sind. Als Test habe ich den Text von „Krieg und Frieden“ über das Gutenberg Project (www.gutenberg.org) heruntergeladen und beide Reihenfolgen ausprobiert. Das Verschlüsseln (mit RijndaelManaged und der Standardschlüsselgröße) und anschließende Komprimieren führte zu einem Datenstrom, der um 250 Prozent größer war als der durch anfängliches Komprimieren und anschließendes Verschlüsseln generierte. Zudem nahm der Prozess 50 Prozent mehr Zeit in Anspruch.
Nun zu Ihrer eigentlichen Frage: Ja, es gibt mehrere mögliche Ansätze. Der erste besteht darin, die eigentlichen Komprimierungs- und Verschlüsselungsvorgänge zu parallelisieren. Wahrscheinlich möchten (und sollten) Sie die Funktionalität in GZipStream und CryptoStream nicht erneut implementieren. Bis eine Parallelisierung durch das .NET Framework-Team erfolgt, benötigen Sie also eine alternative Lösung.
Wenn für Sie das Ausgabeformat keine Rolle spielt (Sie benötigen zum Beispiel Komprimierung, aber es muss nicht der gzip-Standard eingehalten werden), könnten Sie Ihre Eingabe aufteilen und jeden Abschnitt parallel verarbeiten. Auf Ihrem Dual-Core-Computer könnten Sie zum Beispiel das Eingabebytearray, das derzeit an Ihren GZipStream übergeben wird, halbieren und eine Hälfte mit einem GZipStream und die andere Hälfte mit einem anderen GZipStream verarbeiten. Anschließend können Sie diese nacheinander in Ihrer Ausgabedatei speichern.
Beachten Sie aber, dass Sie der Ausgabe wahrscheinlich einige Headerinformationen hinzufügen müssen, damit der Dekomprimierungsprozess korrekt bestimmen kann, wo ein GZipStream endet und der nächste beginnt. GZipStream puffert Daten beim Lesen des Eingabedatenstroms. Deshalb werden u. U. mehr Daten aus dem Eingabedatenstrom genutzt, als tatsächlich benötigt werden, und Sie müssen die Position im Eingabedatenstrom zurücksetzen, nachdem der erste GZipStream seine Dekomprimierung abgeschlossen hat.
Einer der Vorteile der Aufteilung besteht darin, dass der Prozess relativ gut für mehr als zwei Kerne skaliert werden kann, da Sie einfach so viele Abschnitte erstellen können, wie für die optimale Auslastung Ihrer Prozessoren erforderlich sind. Aber mit nur zwei Kernen und zwei Vorgängen (Komprimierung und Verschlüsselung) besteht die bessere Lösung für Sie jetzt wahrscheinlich darin, eine parallele Datenstrompipeline zu erstellen. Sie können einen Vorgang auf dem einen Prozessor ausführen lassen, während gleichzeitig der andere Vorgang auf dem anderen Prozessor ausgeführt wird. Selbstverständlich können die Prozessoren nicht gleichzeitig die gleichen Daten bearbeiten, da Sie zwei Ausgaben erhalten würden (eine komprimierte und eine verschlüsselte), was für Sie in Anbetracht Ihres Ziels (eine Ausgabe zu erhalten, die sowohl komprimiert als auch verschlüsselt ist) relativ nutzlos wäre. Stattdessen können Sie den Vorgang nachahmen, den Sie wahrscheinlich bereits heute ausführen: Mithilfe des Decorator-Musters übergeben Sie die Ausgabe des einen Datenstroms als Eingabe an den nächsten Datenstrom (siehe Abbildung 1):
Abbildung 1 Übertragen von Daten über Datenströme 
using (CryptoStream encrypt = new CryptoStream(
        output, transform, CryptoStreamMode.Write))
using (GZipStream compress = new GZipStream(
        encrypt, CompressionMode.Compress, true))
    CopyStream(input, compress);
...
static void CopyStream(Stream input, Stream output){
    byte[] buffer = new byte[0x1000];
    int read;
    while ((read = input.Read(buffer, 0, buffer.Length)) > 0) 
        output.Write(buffer, 0, read);
}
Hier kopiert die CopyStream-Methode vom Eingabedatenstrom in den Komprimierungsdatenstrom. Wenn der Komprimierungsdatenstrom einen Puffer mit Daten komprimiert hat, schreibt er die Daten als Eingabe in den Verschlüsselungsdatenstrom. Hat wiederum der Verschlüsselungsdatenstrom die Verarbeitung eines Puffers mit Daten abgeschlossen, schreibt er sie als Eingabe in den Ausgabedatenstrom. Dies wird Pipeline genannt.
Das Parallelisieren einer Pipeline ist ein grundlegendes Konzept, das in der Praxis oft Anwendung findet. Denken Sie an eine Gruppe von Personen, die Einladungsbriefe verschickt. Eine Person ist für das Falten der Briefe verantwortlich, damit sie in einen Umschlag passen, eine Person hat die Aufgabe, die gefalteten Briefe in Umschläge zu stecken, und eine weitere Person ist für das Zukleben und Frankieren der Umschläge zuständig. Beim ersten Brief ist die Person, die den Brief faltet, die einzige, die etwas tut. Die anderen Personen sind unbeschäftigt. Aber dann reicht die erste Person den gefalteten Brief an die Person weiter, die den gefalteten Brief nun in einen Umschlag steckt. Zu diesem Zeitpunkt ist die erste Person bereits mit dem Falten des nächsten Briefs beschäftigt, aber die dritte Person hat noch nichts zu tun. Wenn die ersten beiden Personen ihre Aufgabe erfüllt haben, wird der gefüllte Umschlag der dritten Person übergeben. Nun klebt die dritte Person den Umschlag zu und frankiert ihn, die zweite steckt einen Brief in einen Umschlag, und die erste faltet einen neuen Brief. Von diesem Zeitpunkt an bis nur noch zwei Briefe übrig sind, sind alle drei Personen vollständig ausgelastet (mit Ausnahme der Zeit, die erforderlich ist, um den Brief bzw. die Umschläge von einer Person zur nächsten weiterzureichen). Obwohl also jede Person immer nur eine Aufgabe ausführt, könnte bei einer entsprechenden Anzahl zu verschickender Briefe die meiste Zeit über aufgrund der Beschäftigung von drei Personen theoretisch dreimal so viel Arbeit erledigt werden.
Der gleiche Prozess wird für die Datenstrompipeline eingesetzt. Statt der Übergabe von gefalteten Briefen oder Umschlägen werden natürlich Puffer mit Daten übergeben, da dies das Medium ist, über das Datenströme kommunizieren. Im Unterschied zum sequenziellen Ansatz jedoch, bei dem ein Datenstrom direkt in einen anderen Datenstrom schreibt, ist direkte synchrone Kommunikation nicht mehr möglich, da jeder der Datenströme parallel in separaten Threads arbeitet. Sie benötigen daher eine andere Methode, um Kommunikation zu ermöglichen. Für diese Aufgabe eignet sich mein Programm BlockingStream (siehe Abbildung 2).
public class BlockingStream : Stream
{
    private object _lockForRead;
    private object _lockForAll;
    private Queue<byte[]> _chunks;
    private byte[] _currentChunk;
    private int _currentChunkPosition;
    private ManualResetEvent _doneWriting;
    private ManualResetEvent _dataAvailable;
    private WaitHandle[] _events;
    private int _doneWritingHandleIndex;
    private volatile bool _illegalToWrite;

    public BlockingStream()
    {
        _chunks = new Queue<byte[]>();
        _doneWriting = new ManualResetEvent(false);
        _dataAvailable = new ManualResetEvent(false);
        _events = new WaitHandle[] { _dataAvailable, _doneWriting };
        _doneWritingHandleIndex = 1;
        _lockForRead = new object();
        _lockForAll = new object();
    }

    public override bool CanRead { get { return true; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return !_illegalToWrite; } }

    public override void Flush() { }
    public override long Length { 
        get { throw new NotSupportedException(); } }
    public override long Position { 
        get { throw new NotSupportedException(); } 
        set { throw new NotSupportedException(); } }
    public override long Seek(long offset, SeekOrigin origin) { 
        throw new NotSupportedException(); }
    public override void SetLength(long value) { 
        throw new NotSupportedException(); }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (buffer == null) throw new ArgumentNullException("buffer");
        if (offset < 0 || offset >= buffer.Length) 
            throw new ArgumentOutOfRangeException("offset");
        if (count < 0 || offset + count > buffer.Length) 
            throw new ArgumentOutOfRangeException("count");
        if (_dataAvailable == null) 
            throw new ObjectDisposedException(GetType().Name);

        if (count == 0) return 0;

        while (true)
        {
            int handleIndex = WaitHandle.WaitAny(_events);
            lock (_lockForRead)
            {
                lock (_lockForAll)
                {
                    if (_currentChunk == null)
                    {
                        if (_chunks.Count == 0)
                        {
                            if (handleIndex == _doneWritingHandleIndex) 
                                return 0;
                            else continue;
                        }
                        _currentChunk = _chunks.Dequeue();
                        _currentChunkPosition = 0;
                    }
                }

                int bytesAvailable = 
                    _currentChunk.Length - _currentChunkPosition;
                int bytesToCopy;
                if (bytesAvailable > count)
                {
                    bytesToCopy = count;
                    Buffer.BlockCopy(_currentChunk, _currentChunkPosition, 
                        buffer, offset, count);
                    _currentChunkPosition += count;
                }
                else
                {
                    bytesToCopy = bytesAvailable;
                    Buffer.BlockCopy(_currentChunk, _currentChunkPosition, 
                        buffer, offset, bytesToCopy);
                    _currentChunk = null;
                    _currentChunkPosition = 0;
                    lock (_lockForAll)
                    {
                        if (_chunks.Count == 0) _dataAvailable.Reset();
                    }
                }
                return bytesToCopy;
            }
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (buffer == null) throw new ArgumentNullException("buffer");
        if (offset < 0 || offset >= buffer.Length) 
            throw new ArgumentOutOfRangeException("offset");
        if (count < 0 || offset + count > buffer.Length) 
            throw new ArgumentOutOfRangeException("count");
        if (_dataAvailable == null) 
            throw new ObjectDisposedException(GetType().Name);

        if (count == 0) return;

        byte[] chunk = new byte[count];
        Buffer.BlockCopy(buffer, offset, chunk, 0, count);
        lock (_lockForAll)
        {
            if (_illegalToWrite) 
                throw new InvalidOperationException(
                    "Writing has already been completed.");
            _chunks.Enqueue(chunk);
            _dataAvailable.Set();
        }
    }

    public void SetEndOfStream()
    {
        if (_dataAvailable == null) 
            throw new ObjectDisposedException(GetType().Name);
        lock (_lockForAll)
        {
            _illegalToWrite = true;
            _doneWriting.Set();
        }
    }

    public override void Close()
    {
        base.Close();
        if (_dataAvailable != null)
        {
            _dataAvailable.Close();
            _dataAvailable = null;
        }
        if (_doneWriting != null)
        {
            _doneWriting.Close();
            _doneWriting = null;
        }
    }
}

Das Konzept von BlockingStream unterscheidet sich ein wenig von den meisten anderen Datenströmen in .NET Framework. Wie andere Datenströme wird es von System.IO.Stream abgeleitet und setzt dessen gesamte abstrakte Methoden und Eigenschaften außer Kraft. Was jedoch die Position und die Threadsicherheit betrifft, weist das Programm ein Verhalten auf, das ein wenig unorthodox ist. Die meisten Datenströme in .NET Framework sind nicht threadsicher: Mehrere Threads können nicht gleichzeitig sicher auf eine Instanz des Datenstroms zugreifen, und die meisten Datenströme behalten eine einzige Position bei, an der der nächste Lese- oder Schreibvorgang stattfindet. BlockingStream hingegen ist threadsicher und hat in gewisser Weise zwei Positionen inne, obwohl keine der beiden als numerischer Wert für den Benutzer des Typs verfügbar gemacht wird.
BlockingStream funktioniert dank einer internen Warteschlange von Datenpuffern, die in BlockingStream geschrieben werden. Wenn Daten in den Datenstrom geschrieben werden, wird der geschriebene Puffer in die Warteschlange eingereiht. Wenn Daten aus dem Datenstrom gelesen werden, wird ein Puffer in FIFO-Reihenfolge (first in, first out) aus der Warteschlange entfernt, und die Daten in ihm werden an den Aufrufer zurückgegeben. In dieser Hinsicht gibt es eine Position im Datenstrom, bei der der nächste Schreibvorgang erfolgt, sowie eine Position, an der der nächste Lesevorgang stattfindet.
Beachten Sie jedoch, dass nicht jede Leseanforderung unbedingt die Warteschlange ändert. Wenn ein Leser eine Menge von Daten anfordert, die mindestens so groß wie der nächste Puffer in der Warteschlange ist, reicht dieser eine Puffer für die Leseanforderung aus (Read ist erfolgreich, selbst wenn es weniger Daten zurückgibt, als angefordert wurden). Wenn der Benutzer jedoch eine Datenmenge anfordert, die kleiner als der nächste Puffer ist, werden die übrigen Daten des Puffers separat gespeichert (nicht in der Warteschlange), damit sie verwendet werden können, um die nächste Read-Anforderung zu bedienen (siehe _currentChunk in Abbildung 2). Wenn die nächste Read-Anforderung mit dem Rest der vorherigen Read-Anforderung vollständig bedient werden kann, wird die Warteschlange nicht durch die Leseanforderung geändert.
BlockingStream verwendet ein ManualResetEvent, um Verbrauchern zu signalisieren, ob Daten verfügbar sind. Wenn ein Thread versucht, Daten aus BlockingStream zu lesen, und keine Daten für das Lesen verfügbar sind, wird der Thread durch dieses fehlende ManualResetEvent blockiert, bis Daten geschrieben wurden. Wenn Daten geschrieben werden, aktiviert der Schreiber mithilfe des Ereignisses einen Verbraucher und teilt ihm mit, dass Daten zum Lesen verfügbar sind. Wenn es mehrere Leser gibt, die auf Daten warten, werden alle aktiviert, was zusätzliche Logik in der Read-Methode erforderlich macht.
Die Read-Methode wird als eine einzige große Schleife implementiert, die durchlaufen wird, bis entweder genügend Daten verfügbar sind, um die Leseanforderung zu erfüllen, oder bis ein Schreiber SetEndOfStream aufgerufen hat, das den Datenstrom darüber informiert, dass keine weiteren Daten zur Verfügung stehen. In diesem Fall wäre es sinnlos, auf Daten wartende Leser weiterhin zu blockieren, da sie äußerst lange warten würden. Jede dieser Bedingungen wird durch ein ManualResetEvent repräsentiert, und zwar das zuvor erwähnte sowie ein anderes, das angibt, ob das Schreiben abgeschlossen ist. Wenn Read in die Schleife eintritt, wartet es zunächst darauf, dass eines dieser Ereignisse festgelegt wird (mittels WaitHandle.WaitAny).
BlockingStream ist für die Datenstrompipeline nützlich, weil es das gleiche Datenstrom-zu-Datenstrom-Muster ermöglicht, das in der sequenziellen Version implizit verwendet wird, nun aber threadübergreifende Implementierungen unterstützt. Das Ziel besteht darin, Methoden schreiben zu können, die jeden der erforderlichen datenstrombasierten Vorgänge repräsentieren (siehe Abbildung 3).
static void Compress(Stream input, Stream output){
    using (GZipStream compressor = new GZipStream(
           output, CompressionMode.Compress, true))
        CopyStream(input, compressor);
}

static void Encrypt(Stream input, Stream output) {
    RijndaelManaged rijndael = new RijndaelManaged();
    ... // setup crypto keys
    using (ICryptoTransform transform = rijndael.CreateEncryptor())
    using (CryptoStream encryptor = new CryptoStream(
            output, transform, CryptoStreamMode.Write))
        CopyStream(input, encryptor);
}

Jede dieser Methoden kann in separaten Threads ausgeführt werden, wobei die Ausgabe des einen in Form eines BlockingStream als Eingabe für den anderen dient. Anfänglich wird der zweite Vorgang blockiert, während er auf sein Eingabe-BlockingStream wartet. Sobald der erste Vorgang Daten in den Datenstrom schreibt, wird der zweite Vorgang aktiviert, ruft diese Daten ab und verarbeitet sie. Ich habe diese Funktionalität in die StreamPipeline-Klasse codiert, die in Abbildung 4 dargestellt ist.
public class StreamPipeline : IDisposable
{
    private Action<Stream, Stream>[] _filters;
    private List<BlockingStream> _blockingStreams;

    public StreamPipeline(params Action<Stream, Stream>[] filters)
    {
        if (filters == null) throw new ArgumentNullException("filters");
        if (filters.Length == 0 || Array.IndexOf(filters, null) >= 0) 
            throw new ArgumentException("filters");

        _filters = filters;

        _blockingStreams = new List<BlockingStream>(_filters.Length - 1);
        for (int i = 0; i < filters.Length-1; i++)
        {
            _blockingStreams.Add(new BlockingStream());
        }
    }

    public void Run(Stream input, Stream output)
    {
        if (_blockingStreams == null) 
            throw new ObjectDisposedException(GetType().Name);
        if (input == null) throw new ArgumentNullException("input");
        if (!input.CanRead) throw new ArgumentException("input");
        if (output == null) throw new ArgumentNullException("output");
        if (!output.CanWrite) throw new ArgumentException("output");

        ThreadStart lastStage = null;
        for (int i = 0; i < _filters.Length; i++)
        {
            Stream stageInput = i == 0 ? input : _blockingStreams[i - 1];
            Stream stageOutput = 
                i == _filters.Length - 1 ? output : _blockingStreams[i];
            Action<Stream, Stream> filter = _filters[i];
            ThreadStart stage = delegate
            {
                filter(stageInput, stageOutput);
                if (stageOutput is BlockingStream) 
                    ((BlockingStream)stageOutput).SetEndOfStream();
            };
            if (i < _filters.Length - 1)
            {
                Thread t = new Thread(stage);
                t.IsBackground = true;
                t.Start();
            }
            else lastStage = stage;
        }
        lastStage();
    }

    public void Dispose()
    {
        if (_blockingStreams != null)
        {
            foreach (BlockingStream stream in _blockingStreams) 
                stream.Dispose();
            _blockingStreams = null;
        }
    }
}

StreamPipeline ist sehr einfach zu verwenden. Der Konstruktor akzeptiert ein Parameterarray von Delegaten, von denen jeder einen Eingabedatenstrom und einen Ausgabedatenstrom erwartet. Sie werden Filter genannt. Der Konstruktor erstellt und speichert dann ein BlockingStream, das zwischen jedem Paar von Filtern verwendet wird. Beachten Sie, dass die bereits vorgestellten Compress- und die Encrypt-Methoden der erforderlichen Delegatsignatur entsprechen. Daher können sie als Filter verwendet werden (siehe Abbildung 5).
Abbildung 5 StreamPipeline in Aktion (Klicken Sie zum Vergrößern auf das Bild)
Die Run-Methode akzeptiert den Eingabe- und Ausgabedatenstrom, der sich am Anfang und am Ende der Pipeline befinden sollte. In diesem Beispiel könnte dies ein FileStream der Daten auf dem Datenträger sein, die komprimiert und verschlüsselt werden sollen, sowie ein Ausgabe-FileStream, um die sich ergebenden Daten wieder auf dem Datenträger zu speichern:
using (FileStream input = File.OpenRead("inputData.bin"))
using (FileStream output = File.OpenWrite("outputData.bin"))
using (StreamPipeline pipeline = new StreamPipeline(Compress, Encrypt))
{
    pipeline.Run(input, output);
}
Intern generiert die Run-Methode für jede Phase in der Pipeline einen Delegaten, der den geeigneten Filter ausführt, der mit entsprechenden Eingabe- und Ausgabedatenströmen bereitgestellt wird. Sie erstellt neue Threads zum Ausführen der Arbeitsaufgaben, mit Ausnahme der letzten, die sie im Thread durch Aufrufen von Run ausführt. Der aktuelle Thread müsste für das Abschließen der Arbeit ohnehin blockiert werden. Deshalb ist es sinnvoll, den Thread wiederzuverwenden, um einen Teil der Verarbeitung durchzuführen, statt seine Ressourcen zu verschwenden.
Beachten Sie meine Entscheidung, ThreadPool in diesem Fall nicht zu verwenden. Normalerweise ist mein erster Gedanke, Arbeitsaufgaben in die ThreadPool-Warteschlange einzureihen, statt neue Threads zu starten. In diesem Fall kann die so eingereihte Arbeit jedoch blockiert werden, da darauf gewartet wird, dass andere eingereihte Arbeitsaufgaben Vorgänge ausführen. Dies kann zu einem Deadlock führen. (Unter msdn.microsoft.com/msdnmag/issues/04/12/NETMatters finden Sie ein Beispiel für dieses Problem, mit dem Entwickler oft bei .NET Framework 1.x konfrontiert waren.) Diese Situation kann durch Erstellen benutzerdefinierter Threads für jeden Vorgang vermieden werden.
Diese Entscheidung hat selbstverständlich ihre Nachteile. Zum Beispiel ist es aufwändig, Threads zu erstellen und zu entfernen. Wenn die Filter also nur eine kurze Lebensdauer haben, könnte der Aufwand andere Aspekte der Berechnung überschatten. Wenn Sie geplant haben, eine Reihe verschiedener Eingabe- und Ausgabedatenströme durch das gleiche StreamPipeline zu leiten, dürfte es von Vorteil sein, die relevanten Teile der Klasse neu zu schreiben, um die Threads aktiv zu halten und bei jeder Ausführung wiederzuverwenden. In Anbetracht der Einschränkungen Ihrer eigenen Anwendungen kann es außerdem angebrachter sein, ThreadPool zu verwenden.
Das StreamPipeline ist nun erstellt. Die Hauptfrage lautet jetzt: Wie bewährt es sich? Der Zweck dieser Übung ist schließlich, beide Kerne des Computers zu verwenden, um die Rechenzeit zu verringern. In den Tests auf meinem Dual-Core-Computer wurde unter Verwendung der oben erwähnten „Krieg und Frieden“-Daten diese sequenzielle Pipelineimplementierung um 20–25 Prozent schneller als die sequenzielle Version ausgeführt. Enttäuscht? Die Frage, die Sie sich jetzt hoffentlich stellen, lautet: Weshalb wurden die Berechnungen nicht um 100 Prozent schneller ausgeführt? Stimmt etwas mit der Implementierung nicht? Schließlich wird sie jetzt auf zwei Prozessoren ausgeführt. Sollte sie demzufolge nicht auch zweimal so schnell ausgeführt werden?
Ich habe Sie mit meinem Briefbeispiel am Anfang dieses Artikels ein wenig in die Irre geführt. Ich weiß nicht, wie es Ihnen geht, aber es dauert für mich länger, einen Brief in Dritteln zu falten, als den gefalteten Brief in einen Umschlag zu stecken. So gesehen ist es in der menschlichen Pipeline wahrscheinlich, dass die zweite Person, die die Briefe in Umschläge steckt, nach jedem Brief untätig herumsitzen wird, da sie auf den nächsten gefalteten Brief vom Vorgänger in der Pipeline wartet. Deshalb lässt sich die Lösung des Problems nicht linear zur Anzahl der beteiligten Personen skalieren.
Das Gleiche gilt für das Komprimierungs- und Verschlüsselungsbeispiel. Ich habe gemessen, wie viel Zeit in der sequenziellen Implementierung für die Komprimierung bzw. die Verschlüsselung benötigt wurde. Was würden Sie schätzen? Nur 20 Prozent der Zeit wurde für die Verschlüsselung verwendet. Folglich erreicht die Pipelineimplementierung beinahe den maximalen Geschwindigkeitszuwachs, der in dieser Situation möglich ist.
Das soll nicht heißen, dass diese Implementierung optimal ist. Es gibt mehrere mögliche Leistungsprobleme, die zu Tage treten könnten, wenn die Anzahl der Prozessoren erhöht wird. Das offenkundigste Problem besteht darin, dass Sie mindestens einen Filter pro Prozessor benötigen, um die gesamte potentielle Rechenleistung des Computers auch nur theoretisch auszunutzen. Andere mögliche Probleme betreffen die Tatsache, dass mehrere Sperren innerhalb der BlockingStream-Implementierung verwendet werden. (Eine davon ist nicht erforderlich, wenn es zu einem bestimmten Zeitpunkt immer nur einen Leser gibt, der die Instanz verwendet. Dies ist bei der Verwendung von BlockingStream durch StreamPipeline der Fall.) In Anbetracht der relativ kleinen Menge von Code, die geschrieben werden musste, kann jedoch sogar ein 20- bis 25-prozentiger Leistungsgewinn, wie hier mit der Komprimierung und Verschlüsselung vorgeführt, ziemlich wertvoll sein.
Beachten Sie auch, dass Pipelines sehr wirksam sein können, wenn Phasen selbst Daten parallel verarbeiten können. In obigen Briefbeispiel würde sich z. B. der Durchsatz des ganzen Systems erhöhen, wenn mehrere Personen Briefe falten würden (oder wenn die zweite Person der ersten beim Falten helfen würde, wenn sie gerade keine Briefe in Umschläge stecken muss). Im Komprimierungs-/Verschlüsselungsbeispiel entspricht dies dem bereits erwähnten Aufteilungsansatz, sodass die Abschnitte in jener Phase der Pipeline parallel komprimiert und dann in der folgenden Phase parallel verschlüsselt werden und so weiter. Je feiner die Parallelität in der Anwendung abgestimmt ist, desto größer die Wahrscheinlichkeit, dass alle Prozessoren im Computer helfen können, schneller zum Ziel zu gelangen, insbesondere in Anbetracht des technologischen Fortschritts und der zunehmenden Zahl von Prozessoren in durchschnittlichen PCs.

Senden Sie Fragen und Kommentare für Stephen Toub in englischer Sprache an netqa@microsoft.com.


Stephen Toub ist leitender Programmmanager im Parallel Computing Platform-Team von Microsoft. Er schreibt außerdem redaktionelle Beiträge für das MSDN Magazin.

Page view tracker