.NET Matters

Asynchronous Stream Processing

Stephen Toub

Q I've read about some neat tricks you can play to ease the development of asynchronous implementations using C# iterators. (Jeffrey Richter was extolling the virtues of such techniques in a previous issue of MSDN® Magazine.) However, I was trying to figure out how to do similar things without such code transformations, and I was having a very hard time. For example, let's say I want to copy from one stream to another, using only the asynchronous methods on Stream. I understand how to issue the asynchronous requests once (to do one read and one write), but how can I loop such requests continually in order to process the whole stream? Is that even possible?

A Yes, it's possible. I'll show how, using your example, with both C# and Visual Basic® (noting that Visual Basic doesn't provide compiler support for iterators). To set the stage, Figure 1 shows a synchronous implementation of what we want to implement asynchronously, which could be called with code such as this:

Figure 1 Synchronous CopyStreamToStream

public static void CopyStreamToStream(
        Stream source, Stream destination,
        Action<Stream, Stream, Exception> completed) 
{
    byte[] buffer = new byte[0x1000];
    int read;
    try 
    {
        while ((read = source.Read(buffer, 0, buffer.Length)) > 0) 
        {
            destination.Write(buffer, 0, read);
        }
        if (completed != null) completed(source, destination, null);
    }
    catch (Exception exc) 
    {
        if (completed != null) completed(source, destination, exc);
    }
}

FileStream input = ...;
FileStream output = ...;
CopyStreamToStream(input, output, (src,dst,exc) => {
    src.Close();
    dst.Close();
});

With the synchronous implementation, CopyStreamToStream will block until the stream has been copied and the completed callback has been executed. You can easily implement this as an asynchronous invocation in a few ways. One would be to explicitly queue to the ThreadPool a work item that calls this method:

ThreadPool.QueueUserWorkItem(delegate
{
    CopyStreamToStream(input, output, (src,dst,exc) =>
    {
        src.Close();
        dst.Close();
    });
});

Another would be to use asynchronous delegate invocation, which would have largely the same effect:

delegate void CopyStreamToStreamHandler(
    Stream source, Stream destination,
    Action<Stream, Stream, Exception> completed);
...
CopyStreamToStreamHandler handler = CopyStreamToStream;
handler.BeginInvoke(source, destination, (src,dst,exc) =>
{
    src.Close();
    dst.Close();
}, iar => handler.EndInvoke(iar), null);

But neither of those approaches is asynchronous in the way you're looking for. Yes, they're both asynchronous with respect to the thread starting their execution (meaning that the calling thread won't be blocked waiting for their completion); however, in both cases the CopyStreamToStream method is just being run on a thread from the pool, and all of the reads and writes it's issuing are synchronous with regard to the rest of the method call.

If I were to launch CopyStreamToStream asynchronously in this fashion 100 times, 100 threads (possibly reused) would be needed to execute those copies. Not only is that significant overhead, but given the number of threads available in the thread pool and the rate at which the thread count is increased, a significant number of the copies probably wouldn't begin execution until others had finished, increasing the overall time to completion.

What you're looking for is a true asynchronous implementation of CopyStreamToStream, where instead of issuing Read and Write calls on the Streams, you issue BeginRead and BeginWrite calls, with appropriately matched calls to EndRead and EndWrite.

As you can see in the synchronous implementation, you need to issue a read request against the source Stream, take the retrieved data and write it to the destination Stream, and then rinse and repeat. With the asynchronous implementation, rather than using a loop, as I did in the synchronous implementation, I'll use continuation-passing style to package everything that comes after the asynchronous request, and pass that as a closure to be executed when the asynchronous request completes. To begin, I issue an asynchronous read:

byte[] buffer = new byte[0x1000];
source.BeginRead(buffer, 0, buffer.Length, readResult =>
{
    int read = source.EndRead(readResult);
    ... // process read results in buffer here
}, null);

The BeginRead method accepts the same three parameters that the Read method accepts: the target buffer in which to place the data, the index into the buffer specifying where the data will be placed, and the amount of data to be read. The BeginRead method also takes two additional parameters: the first represents the delegate to be called when the read request completes, and the second is any state that I want to be available in the callback. Within the callback, I issue an EndRead call against the source, providing it with the IAsyncResult from the BeginRead call and allowing me to retrieve the result of the read. (EndRead will also throw any exceptions that may have occurred as a result of the read.) After that, I want to write that data out to the destination Stream:

int read = source.EndRead(readResult);
if (read > 0)
{
    destination.BeginWrite(buffer, 0, read, writeResult =>
    {
        destination.EndWrite(writeResult);
        ...
    }, null);
}

Again, rather than calling Write on the destination Stream, I call BeginWrite, passing it the same parameters I would have passed to Write, along with a delegate to be executed when the write request completes (as well as a state argument, which I'm ignoring). So far, so good. I've asynchronously issued a read request, and the thread on which I issued the request will not block waiting for the request to complete. When it does complete, I asynchronously issue a write request and the callback from the read request doesn't block waiting for the write request to complete.

But now I'm in a bind. In the BeginWrite callback, do I code another call to BeginRead in order to read more from the source stream? If so, what do I pass as the callback for that read? I can't explicitly code every read/write pair; there could be an infinite number of them, and that would make for a very long code file.

Instead, a good approach is to house the call to EndRead in a method that can be called through an AsyncCallback delegate. That method will serve as the callback for BeginRead. Then, in the BeginWrite callback, after I end the previous write, I can invoke the method that issues the next BeginRead request. To start, I issue a single call to BeginRead, passing the callback delegate I just described. That gives me an implementation like the following:

AsyncCallback rc = readResult =>
{
    int read = source.EndRead(readResult);
    if (read > 0)
    {
        destination.BeginWrite(buffer, 0, read, writeResult =>
        {
            destination.EndWrite(writeResult);
            source.BeginRead(buffer, 0, buffer.Length, rc, null);
        }
    }
}

All would appear to be well, until you try to compile this, at which point the friendly C# compiler will tell you:

error CS0165: Use of unassigned local variable 'rc'

In other words, I'm trying to use the 'rc' variable within my callback, and I'm initializing the 'rc' variable to my callback, but so as far as the C# compiler is concerned, 'rc' is undefined at the time. (For more details, see Eric Lippert's blog post at go.microsoft.com/fwlink/?LinkId=109263). The simple workaround is to first declare the 'rc' variable and initialize it to null. Then as a separate statement, assign the callback to 'rc':

AsyncCallback rc = null;
rc = readResult =>
{
    ...
};

The full implementation is shown in Figure 2. You'll notice some additional functionality added to this, over and above the transformation from Read/Write to BeginRead/BeginWrite. First, I've added exception handling to the implementation. As with the synchronous implementation, when an exception is caught, the completed delegate is called with the exception to notify the application that something went wrong and to allow the exception to be dealt with appropriately. Second, we're taking advantage of the AsyncOperation class, which was introduced in the Microsoft® .NET Framework 2.0. This class creates an instance of AsyncOperation using the AsyncOperationManager and captures the current SynchronizationContext, allowing me to invoke operations in the context of that SynchronizationContext. (For more information, see msdn.microsoft.com/msdnmag/issues/06/06/NETMatters/#qa7.)

Figure 2 Async CopyStreamToStream in C#

public static void CopyStreamToStream(
    Stream source, Stream destination, 
    Action<Stream,Stream,Exception> completed) {
    byte[] buffer = new byte[0x1000];
    AsyncOperation asyncOp = AsyncOperationManager.CreateOperation(null);

    Action<Exception> done = e => {
        if (completed != null) asyncOp.Post(delegate { 
            completed(source, destination, e); }, null);
    };

    AsyncCallback rc = null;
    rc = readResult => {
        try {
            int read = source.EndRead(readResult);
            if (read > 0) {
                destination.BeginWrite(buffer, 0, read, writeResult => {
                    try {
                        destination.EndWrite(writeResult);
                        source.BeginRead(
                            buffer, 0, buffer.Length, rc, null);
                    }
                    catch (Exception exc) { done(exc); }
                }, null);
            }
            else done(null);
        }
        catch (Exception exc) { done(exc); }
    };

    source.BeginRead(buffer, 0, buffer.Length, rc, null);
}

For example, if CopyStreamToStream were invoked from the GUI thread of a Windows® Forms application, the captured SynchronizationContext would actually be a WindowsFormsSynchronizationContext, and calling Post on the resulting AsyncOperation would cause the delegate provided to execute on the GUI thread. This allows CopyStreamToStream to play nicely within whatever environment it may be running.

Now, at the beginning of my response I promised an asynchronous implementation in both C# and Visual Basic. As you've seen, while I didn't take advantage of iterators in C#, I did take advantage of other C# features, such as anonymous methods. And anonymous methods aren't yet supported in Visual Basic. (Visual Basic 2008 supports lambda expressions, but those are limited to single expressions.) Doing the same core task in Visual Basic is certainly feasible, but it requires more work, as shown in Figure 3. Most of the extra code here is associated with manually creating and passing around the closures that I was getting for free when using C#.

Figure 3 Async CopyStreamToStream in Visual Basic

Public Shared Sub CopyStreamToStream( _
    ByVal source As Stream, ByVal destination As Stream, _
    ByVal completed As Action(Of Stream, Stream, Exception))
    
    Dim buffer(&H1000) As Byte
    Dim data = New CopyData With { _
        .Source = source, .Destination = destination, _
        .Buffer = buffer, .Completed = completed, _
        .AsyncOperation = AsyncOperationManager.CreateOperation(Nothing)}
    source.BeginRead(buffer, 0, buffer.Length, _
        AddressOf ReadCallbackSub, data)
End Sub

Private Shared Sub ReadCallbackSub(ByVal readResult As IAsyncResult)
    Dim copyData = CType(readResult.AsyncState, CopyData)
    Try
        Dim read = copyData.Source.EndRead(readResult)
        If read > 0 Then
            copyData.Destination.BeginWrite( _
                copyData.Buffer, 0, read, _
                AddressOf WriteCallbackSub, copyData)
        Else
            copyData.AsyncOperation.Post( _
                Function() Done(copyData, Nothing), Nothing)
        End If
    Catch ex As Exception
        copyData.AsyncOperation.Post( _
            Function() Done(copyData, ex), Nothing)
    End Try
End Sub

Private Shared Sub WriteCallbackSub(ByVal writeResult As IAsyncResult)
    Dim copyData = CType(writeResult.AsyncState, CopyData)
    Try
        copyData.Destination.EndWrite(writeResult)
        copyData.Source.BeginRead( _
            copyData.Buffer, 0, copyData.Buffer.Length, _
            AddressOf ReadCallbackSub, copyData)
    Catch ex As Exception
        copyData.AsyncOperation.Post( _
            Function() Done(copyData, ex), Nothing)
    End Try
End Sub

Private Shared Function Done( _
        ByVal copyData As CopyData, ByVal ex As Exception) As Boolean
    If copyData.Completed <> Nothing Then
        copyData.Completed(copyData.Source, copyData.Destination, ex)
    End If
    Return True
End Function

Private Class CopyData
    Public Source, Destination As Stream
    Public Buffer() As Byte
    Public AsyncOperation As AsyncOperation
    Public Completed As Action(Of Stream, Stream, Exception)
End Class

Regardless of whether I'm using the C# or Visual Basic implementation, I now have a truly asynchronous implementation that copies all of the data from one stream to another. The benefit of this, however, depends on how the streams themselves implement the asynchronous BeginRead and BeginWrite methods. The implementation of these methods on the base Stream class simply executes Read and Write from the ThreadPool using an asynchronous delegate invocation. So while each read and write will be asynchronous, from a performance perspective this will likely be slower overall than just doing the reads and writes synchronously.

Derived streams can choose to provide their own implementation of BeginRead and BeginWrite, and when they do, it's typically done because the domain for which they're intended has true asynchronous read and write support. For example, FileStream's implementation of BeginRead and BeginWrite provides access to the asynchronous I/O support available in the Windows file system. If you construct a FileStream by passing true for the useAsync constructor parameter, and if the version of Windows you're running supports asynchronous I/O, the FileStream's BeginRead and BeginWrite methods will be truly asynchronous. While data is being read from or written to the disk, no threads in your application will be blocked or utilized for these actions; only when read data is available or when a write has completed will the relevant callbacks be issued and a thread from the ThreadPool be used. This makes it possible to write scalable systems that minimize resource usage.

As you can see, while writing this kind of code by hand can create high-performance applications, doing so is also very tedious and error-prone. Hopefully in the future such constructs will be easier to create in mainstream languages like C# and Visual Basic.

F# has already made great progress down that road with its asynchronous workflows (see blogs.msdn.com/dsyme/archive/2007/10/11/introducing-f-asynchronous-workflows.aspx). There are also several libraries floating around the Web that take advantage of C# iterators to ease the pain of writing asynchronous code. An example of this is the Concurrency and Coordination Runtime (CCR), which you can read about: Concurrent Affairs.

And as you mentioned in your question, Jeffrey Richter has created a more generic AsyncEnumerator implementation, which he started discussing in his column at msdn.microsoft.com/msdnmag/issues/07/11/ConcurrentAffairs.

Send your questions and comments for Stephen to netqa@microsoft.com.

Stephen Toub is a Senior Program Manager Lead on the Parallel Computing Platform team at Microsoft. He is also a Contributing Editor for MSDN Magazine.