MSDN Magazin > Home > Ausgaben > 2008 > March >  .NET-Themen: Asynchrone Datenstromverarbeitung
.NET-Themen
Asynchrone Datenstromverarbeitung
Stephen Toub


F: Ich habe gehört, dass es für die Entwicklung asynchroner Implementierungen mit C#-Iteratoren einige hilfreiche Tricks gibt. (In einer vorangegangenen Ausgabe des MSDN® Magazins ist Jeffrey Richter auf die Vorzüge dieser Vorgehensweisen eingegangen.) Mein Versuch, etwas Ähnliches ohne Codetransformation zu erreichen, hat sich jedoch als äußerst schwierig erwiesen. Beispielsweise wollte ich von einem zu einem anderen Datenstrom kopieren und dabei lediglich die asynchronen Methoden auf dem Datenstrom verwenden. Ich weiß, wie die asynchronen Anforderungen einmal ausgegeben werden (um einmal zu lesen und einmal zu schreiben), doch wie kann ich diese Anforderungen kontinuierlich durchlaufen lassen, um den gesamten Datenstrom zu verarbeiten? Ist das überhaupt möglich?
F: Ich habe gehört, dass es für die Entwicklung asynchroner Implementierungen mit C#-Iteratoren einige hilfreiche Tricks gibt. (In einer vorangegangenen Ausgabe des MSDN® Magazins ist Jeffrey Richter auf die Vorzüge dieser Vorgehensweisen eingegangen.) Mein Versuch, etwas Ähnliches ohne Codetransformation zu erreichen, hat sich jedoch als äußerst schwierig erwiesen. Beispielsweise wollte ich von einem zu einem anderen Datenstrom kopieren und dabei lediglich die asynchronen Methoden auf dem Datenstrom verwenden. Ich weiß, wie die asynchronen Anforderungen einmal ausgegeben werden (um einmal zu lesen und einmal zu schreiben), doch wie kann ich diese Anforderungen kontinuierlich durchlaufen lassen, um den gesamten Datenstrom zu verarbeiten? Ist das überhaupt möglich?

A: Ja, das ist möglich. Ich werde dies anhand Ihres Beispiels sowohl mit C# als auch mit Visual Basic® aufzeigen (Sie sollten dabei jedoch beachten, dass Visual Basic keine Compilerunterstützung für Iteratoren bietet). Abbildung 1 zeigt eine synchrone Implementierung dessen, was asynchron implementiert werden soll und mit folgendem Code aufgerufen werden kann:
A: Ja, das ist möglich. Ich werde dies anhand Ihres Beispiels sowohl mit C# als auch mit Visual Basic® aufzeigen (Sie sollten dabei jedoch beachten, dass Visual Basic keine Compilerunterstützung für Iteratoren bietet). Abbildung 1 zeigt eine synchrone Implementierung dessen, was asynchron implementiert werden soll und mit folgendem Code aufgerufen werden kann:
public static void CopyStreamToStream(
        Stream source, Stream destination,
        Action<Stream, Stream, Exception> completed) 
{
    byte[] buffer = new byte[0x1000];
    int read;
    try 
    {
        while ((read = source.Read(buffer, 0, buffer.Length)) > 0) 
        {
            destination.Write(buffer, 0, read);
        }
        if (completed != null) completed(source, destination, null);
    }
    catch (Exception exc) 
    {
        if (completed != null) completed(source, destination, exc);
    }
}

FileStream input = ...;
FileStream output = ...;
CopyStreamToStream(input, output, (src,dst,exc) => {
    src.Close();
    dst.Close();
});
Bei der synchronen Implementierung blockiert CopyStreamToStream, bis der Datenstrom kopiert und der abgeschlossene Rückruf ausgeführt wurde. Dies kann auf verschiedene Art und Weise problemlos als asynchroner Aufruf implementiert werden. Eine Möglichkeit ist die explizite Einreihung einer Arbeitsaufgabe zu ThreadPool, die diese Methode aufruft:
ThreadPool.QueueUserWorkItem(delegate
{
    CopyStreamToStream(input, output, (src,dst,exc) =>
    {
        src.Close();
        dst.Close();
    });
});
Eine andere besteht in der Verwendung eines asynchronen Delegataufrufs, der im Prinzip die gleiche Wirkung hat:
delegate void CopyStreamToStreamHandler(
    Stream source, Stream destination,
    Action<Stream, Stream, Exception> completed);
...
CopyStreamToStreamHandler handler = CopyStreamToStream;
handler.BeginInvoke(source, destination, (src,dst,exc) =>
{
    src.Close();
    dst.Close();
}, iar => handler.EndInvoke(iar), null);
Aber keiner dieser Ansätze ist auf die Art asynchron, die Sie sich wünschen. In Bezug auf den Thread, der die Ausführung startet, sind zwar beide asynchron (das heißt, der aufrufende Thread ist nicht blockiert, während er auf den Abschluss wartet), doch in beiden Fällen wird die CopyStreamToStream-Methode nur auf einem Thread aus dem Pool ausgeführt, und alle von ihm ausgegebenen Lese- und Schreibvorgänge sind in Bezug auf den verbleibenden Methodenaufruf synchron.
Wenn CopyStreamToStream auf diese Art 100-mal asynchron gestartet würde, wären 100 Threads erforderlich (möglicherweise wiederverwendet), um diese Kopien auszuführen. Das bedeutet nicht nur enormen Aufwand, sondern angesichts der im Threadpool vorhandenen Anzahl von Threads und der Geschwindigkeit, mit der die Threadzahl erhöht wird, würde eine große Zahl der Kopien wahrscheinlich erst dann ausgeführt, wenn andere beendet sind. Dadurch erhöht sich die Gesamtausführungszeit.
Sie sind auf der Suche nach einer wahren asynchronen Implementierung von CopyStreamToStream, die keine Lese- und Schreibaufrufe an die Datenströme ausgibt, sondern BeginRead und BeginWrite mit entsprechend angepassten Aufrufen für EndRead und EndWrite ausgibt.
Wie Sie in der synchronen Implementierung sehen, müssen Sie eine Leseanforderung an den Quelldatenstrom ausgeben, die abgerufenen Daten an den Zieldatenstrom schreiben und das Ganze wiederholen. Bei der asynchronen Implementierung wird anstatt einer Schleife, wie in der synchronen Implementierung, das fortlaufende Weitergeben von Formaten verwendet, um all das zu verpacken, was nach der asynchronen Anforderung kommt, und es als Closure zur Ausführung weiterzugeben, wenn die asynchrone Anforderung beendet ist. Zunächst wird dazu eine asynchrone Leseanweisung ausgegeben:
byte[] buffer = new byte[0x1000];
source.BeginRead(buffer, 0, buffer.Length, readResult =>
{
    int read = source.EndRead(readResult);
    ... // process read results in buffer here
}, null);
Die BeginRead-Methode akzeptiert dieselben drei Parameter, die von der Lesemethode akzeptiert werden: den Zielpuffer, in den die Daten platziert werden sollen, den Index im Puffer, der den Ort angibt, an dem die Daten platziert werden, und die Menge der zu lesenden Daten. Die BeginRead-Methode erfordert auch zwei zusätzliche Parameter: Der erste ist der Delegat, der aufgerufen wird, wenn die Leseanforderung abgeschlossen ist, und der zweite ist ein Zustand, der im Rückruf verfügbar sein soll. Innerhalb des Rückrufs wird ein EndRead-Aufruf für die Quelle ausgegeben, indem IAsyncResult des BeginRead-Aufrufs zur Verfügung gestellt wird, wodurch das Ergebnis des Lesevorgangs abgerufen werden kann. (EndRead generiert außerdem Ausnahmen, die möglicherweise infolge des Lesevorgangs aufgetreten sind.) Danach sollen diese Daten an den Zieldatenstrom geschrieben werden:
int read = source.EndRead(readResult);
if (read > 0)
{
    destination.BeginWrite(buffer, 0, read, writeResult =>
    {
        destination.EndWrite(writeResult);
        ...
    }, null);
}
Anstatt den Schreibvorgang auf dem Zieldatenstrom aufzurufen, wird auch hier BeginWrite aufgerufen, und es werden die gleichen Parameter übergeben, die Sie an den Schreibvorgang übergeben hätten, zusammen mit dem Delegaten, der ausgeführt werden soll, wenn die Schreibanforderung beendet ist (sowie ein Statusargument, das Sie ignorieren). So weit, so gut. Sie haben eine Leseanforderung asynchron ausgegeben, und der Thread, für den Sie die Anforderung ausgegeben haben, blockiert nicht, während er auf die Beendigung der Anforderung wartet. Wenn sie abgeschlossen ist, geben Sie eine Schreibanforderung asynchron aus, und der Rückruf von der Leseanforderung blockiert nicht, während er darauf wartet, dass die Anforderung beendet wird.
Aber jetzt sind Sie in einer Klemme. Codieren Sie im BeginWrite-Rückruf einen anderen Aufruf an BeginRead, um mehr vom Quelldatenstrom zu lesen? Wenn ja, was übergeben Sie als Rückruf für diesen Lesevorgang? Es kann nicht jedes Schreib-/Lesepaar codiert werden, da möglicherweise eine unendliche Anzahl vorliegt, wodurch die Codedatei sehr lang würde.
Stattdessen besteht ein guter Ansatz darin, den Aufruf an EndRead in einer Methode unterzubringen, die durch einen AsyncCallback-Delegaten aufgerufen werden kann. Diese Methode dient als Rückruf für BeginRead. Nach Beendigung des vorherigen Schreibvorgangs können Sie im BeginWrite-Rückruf die Methode aufrufen, die die nächste BeginRead-Anforderung ausgibt. Zum Starten geben Sie einen einzelnen Aufruf an BeginRead aus, indem Sie den gerade beschriebenen Rückrufdelegaten übergeben. Dadurch ergibt sich folgende Implementierung:
AsyncCallback rc = readResult =>
{
    int read = source.EndRead(readResult);
    if (read > 0)
    {
        destination.BeginWrite(buffer, 0, read, writeResult =>
        {
            destination.EndWrite(writeResult);
            source.BeginRead(buffer, 0, buffer.Length, rc, null);
        }
    }
}
source.BeginRead(buffer, 0, buffer.Length, rc, null);
Alles scheint in Ordnung zu sein, bis Sie mit der Kompilierung beginnen. An diesem Punkt wird vom C#-Compiler folgende Meldung ausgegeben:
error CS0165: Use of unassigned local variable 'rc'
Sie versuchen also, die Variable „rc“ innerhalb des Rückrufs zu verwenden, und initialisieren die Variable „rc“ zum Rückruf. Für den C#-Compiler ist „rc“ zu diesem Zeitpunkt jedoch noch undefiniert. (Weitere Informationen finden Sie in Eric Lipperts Blogbeitrag unter go.microsoft.com/fwlink/?LinkId=109263.) Die einfache Lösung besteht darin, zunächst die Variable „rc“ zu deklarieren und mit Null zu initialisieren. Weisen Sie „rc“ den Rückruf dann als separate Anweisung zu:
AsyncCallback rc = null;
rc = readResult =>
{
    ...
};
Die komplette Implementierung ist in Abbildung 2 dargestellt. Sie werden die zusätzliche Funktionalität bemerken, die neben der Transformation von Read/Write zu BeginRead/BeginWrite hinzugefügt wurde. Zuerst wurde der Implementierung Ausnahmebehandlung hinzugefügt. Wird wie bei der synchronen Implementierung eine Ausnahme gefunden, wird der beendete Delegat mit der Ausnahme aufgerufen. Somit erfährt die Anwendung, dass ein Fehler aufgetreten ist, und die Ausnahme kann entsprechend behandelt werden. Zweitens wird die AsyncOperation-Klasse genutzt, die in Microsoft® .NET Framework 2.0 eingeführt wurde. Diese Klasse erzeugt unter Verwendung von AsyncOperationManager eine Instanz von AsyncOperation und erfasst den aktuellen SynchronizationContext, wodurch Vorgänge im Kontext dieses SynchronizationContext aufgerufen werden können. (Weitere Informationen finden Sie unter msdn.microsoft.com/msdnmag/issues/06/06/NETMatters/#qa7.)
public static void CopyStreamToStream(
    Stream source, Stream destination, 
    Action<Stream,Stream,Exception> completed) {
    byte[] buffer = new byte[0x1000];
    AsyncOperation asyncOp = AsyncOperationManager.CreateOperation(null);

    Action<Exception> done = e => {
        if (completed != null) asyncOp.Post(delegate { 
            completed(source, destination, e); }, null);
    };

    AsyncCallback rc = null;
    rc = readResult => {
        try {
            int read = source.EndRead(readResult);
            if (read > 0) {
                destination.BeginWrite(buffer, 0, read, writeResult => {
                    try {
                        destination.EndWrite(writeResult);
                        source.BeginRead(
                            buffer, 0, buffer.Length, rc, null);
                    }
                    catch (Exception exc) { done(exc); }
                }, null);
            }
            else done(null);
        }
        catch (Exception exc) { done(exc); }
    };

    source.BeginRead(buffer, 0, buffer.Length, rc, null);
}

Wird CopyStreamToStream zum Beispiel vom Thread der grafischen Benutzeroberfläche einer Windows® Forms-Anwendung aufgerufen, wäre der aufgezeichnete SynchronizationContext tatsächlich ein WindowsFormsSynchronizationContext, und das Senden der daraus resultierenden AsyncOperation führt dazu, dass der Delegat auf dem Thread der grafischen Benutzeroberfläche ausgeführt wird. Dadurch kann CopyStreamToStream problemlos in jeder beliebigen Umgebung ausgeführt werden.
Zu Beginn der Antwort wurde eine asynchrone Implementierung sowohl in C# als auch in Visual Basic versprochen. Wie Sie gesehen haben, wurden keine Iteratoren in C# genutzt. Stattdessen wurden andere Features in C# verwendet, beispielsweise anonyme Methoden. Anonyme Methoden werden in Visual Basic allerdings noch nicht unterstützt. (Visual Basic 2008 unterstützt Lambda-Ausdrücke, die jedoch auf einzelne Ausdrücke begrenzt sind.) Die Ausführung derselben Aufgabe in Visual Basic ist sicher möglich, sie erfordert jedoch einen größeren Aufwand, wie in Abbildung 3 zu sehen ist. Der Großteil des zusätzlichen Codes erfordert eine manuelle Erstellung und Übergabe der Closures, die in C# frei verfügbar waren.
Public Shared Sub CopyStreamToStream( _
    ByVal source As Stream, ByVal destination As Stream, _
    ByVal completed As Action(Of Stream, Stream, Exception))
    
    Dim buffer(&H1000) As Byte
    Dim data = New CopyData With { _
        .Source = source, .Destination = destination, _
        .Buffer = buffer, .Completed = completed, _
        .AsyncOperation = AsyncOperationManager.CreateOperation(Nothing)}
    source.BeginRead(buffer, 0, buffer.Length, _
        AddressOf ReadCallbackSub, data)
End Sub

Private Shared Sub ReadCallbackSub(ByVal readResult As IAsyncResult)
    Dim copyData = CType(readResult.AsyncState, CopyData)
    Try
        Dim read = copyData.Source.EndRead(readResult)
        If read > 0 Then
            copyData.Destination.BeginWrite( _
                copyData.Buffer, 0, read, _
                AddressOf WriteCallbackSub, copyData)
        Else
            copyData.AsyncOperation.Post( _
                Function() Done(copyData, Nothing), Nothing)
        End If
    Catch ex As Exception
        copyData.AsyncOperation.Post( _
            Function() Done(copyData, ex), Nothing)
    End Try
End Sub

Private Shared Sub WriteCallbackSub(ByVal writeResult As IAsyncResult)
    Dim copyData = CType(writeResult.AsyncState, CopyData)
    Try
        copyData.Destination.EndWrite(writeResult)
        copyData.Source.BeginRead( _
            copyData.Buffer, 0, copyData.Buffer.Length, _
            AddressOf ReadCallbackSub, copyData)
    Catch ex As Exception
        copyData.AsyncOperation.Post( _
            Function() Done(copyData, ex), Nothing)
    End Try
End Sub

Private Shared Function Done( _
        ByVal copyData As CopyData, ByVal ex As Exception) As Boolean
    If copyData.Completed <> Nothing Then
        copyData.Completed(copyData.Source, copyData.Destination, ex)
    End If
    Return True
End Function

Private Class CopyData
    Public Source, Destination As Stream
    Public Buffer() As Byte
    Public AsyncOperation As AsyncOperation
    Public Completed As Action(Of Stream, Stream, Exception)
End Class

Unabhängig davon, ob die Implementierung in C# oder Visual Basic verwendet wird, liegt jetzt eine wahre asynchrone Implementierung vor, die alle Daten von einem Datenstrom in einen anderen kopiert. Der Vorteil dieser Vorgehensweise ist jedoch davon abhängig, wie die asynchronen BeginRead- und BeginWrite-Methoden von den Datenströmen selbst implementiert werden. Die Implementierung dieser Methoden auf der Streambasisklasse führt einfach Read und Write aus dem ThreadPool unter Verwendung eines asynchronen Delegataufrufs aus. Jeder Lese- und Schreibvorgang ist somit asynchron, wird aber in Bezug auf die Leistung wahrscheinlich insgesamt langsamer sein, als eine synchrone Ausführung der Lese- und Schreibvorgänge.
Abgeleitete Datenströme können ihre eigene Implementierung von BeginRead und BeginWrite bereitstellen. Dies liegt normalerweise daran, dass die Domäne, für die sie vorgesehen sind, über wahre asynchrone Lese-und Schreibzugriffsunterstützung verfügt. So bietet zum Beispiel die Implementierung von FileStream von BeginRead und BeginWrite Zugriff auf die asynchrone E/A-Unterstützung, die im Windows-Dateisystem verfügbar ist. Wenn Sie ein FileStream erstellen, indem Sie den useAsync-Konstruktorparameter als wahr übergeben, und wenn die verwendete Windows-Version asynchrones E/A unterstützt, sind die BeginRead- und BeginWrite-Methoden von FileStream wahrhaft asynchron. Während des Lesens von Daten vom oder Schreibens auf den Datenträger werden keine Threads in Ihrer Anwendung blockiert oder für diese Aktionen verwendet. Nur wenn gelesene Daten zur Verfügung stehen oder ein Schreibvorgang beendet wurde, werden die relevanten Rückrufe ausgegeben und ein Thread aus dem ThreadPool verwendet. Dadurch können skalierbare Systeme geschrieben werden, die die Ressourcenverwendung minimieren.
Wie Sie sehen, kann das Schreiben dieser Art von Code von Hand leistungsstarke Anwendungen schaffen, doch ist es mühsam und fehleranfällig. In Zukunft können solche Konstrukte hoffentlich leichter in den Hauptsprachen wie C# und Visual Basic erstellt werden.
In F# wurden mit asynchronen Workflows in dieser Hinsicht bereits große Fortschritte erzielt (weitere Informationen finden Sie unter blogs.msdn.com/dsyme/archive/2007/10/11/introducing-f-asynchronous-workflows.aspx). Außerdem gibt es im Internet zahlreiche Bibliotheken, die C#-Iteratoren nutzen und das Schreiben asynchronen Codes erleichtern. Ein Beispiel dafür ist Concurrency und Coordination Runtime (CCR). Weitere Informationen hierzu finden Sie unter msdn.microsoft.com/msdnmag/issues/06/09/ConcurrentAffairs.
Wie Sie in Ihrer Frage erwähnten, hat Jeffrey Richter eine generischere AsyncEnumerator-Implementierung erstellt, die in seiner Kolumne unter msdn.microsoft.com/msdnmag/issues/07/11/ConcurrentAffairs ansatzweise erläutert wird.

Senden Sie Fragen und Kommentare für Stephen Toub in englischer Sprache an netqa@microsoft.com.


Stephen Toub ist leitender Programmmanager im Parallel Computing Platform-Team von Microsoft. Er schreibt außerdem redaktionelle Beiträge für das MSDN Magazin.

Page view tracker