Asynchronous Methods

This chapter is excerpted from C# 3.0 in a Nutshell, Third Edition: A Desktop Quick Reference by Joseph Albahari, Ben Albahari, published by O'Reilly Media

C# 3.0 in a Nutshell, Third Edition

Logo

Buy Now

In Chapter 19, Threading, we saw how a thread provides a parallel execution path. We took for granted that whenever you needed to run something in parallel, you could assign a new or pooled thread to the job. Although this usually holds true, there are exceptions. Suppose you were writing a TCP sockets or web server application that needed to process 1,000 concurrent requests. If you dedicated a thread to each incoming request, you would consume a gigabyte of memory purely on thread overhead.

Asynchronous methods address this problem through a pattern by which many concurrent activities are handled by a few pooled threads. This makes it possible to write highly concurrent applications-as well as highly thread-efficient applications.

To avoid getting lost, you'll need to be familiar with threading (Chapter 19, Threading) and streams (Chapter 13, Streams and I/O).

Why Asynchronous Methods Exist

The problem just described might be insoluble if every thread needed to be busy all of the time. But this is not the case: fetching a web page, for instance, might take up to several seconds from start to end (because of a potentially slow connection) and yet consume only a fraction of a millisecond of CPU time in total. Processing an HTTP request is not computationally intensive.

This means that a thread dedicated to processing a single web request might spend 99 percent of its time blocked-representing huge economy potential. The asynchronous method pattern exploits just this potential, allowing a handful of fully utilized threads to take on thousands of concurrent jobs.

Tip

If you don't need high concurrency, avoid asynchronous methods; they will unnecessarily complicate your program. Further, they are not guaranteed to execute in parallel with the caller. If you need parallel execution, consider using asynchronous delegates or BackgroundWorker-or simply starting a new thread.

An asynchronous method aims never to block any thread, instead using a pattern of returning with a callback. Blocking means entering a WaitSleepJoin state (or causing another thread to do the same), "wasting" a precious thread resource. To achieve this, an asynchronous method must abstain from calling any blocking method.

A method that takes awhile to execute because it performs computationally intensive work does not violate the system. The purpose of asynchronous methods isn't to provide a convenient mechanism for executing a method in parallel with the caller; it's to optimize thread resources. Here's the golden rule that asynchronous methods follow:

Make good use of the CPU, or exit with a callback!

This means an asynchronous method such as BeginRead may not return immediately to the caller. It can make the caller wait as long as it likes-while making good use of the processor or another constrained resource. It can even finish the entire task synchronously-providing it never blocked and never caused another thread to do the same.

Tip

There is an exception to the nonblocking rule. It's generally okay to block while calling a database server-if other threads are competing for the same server. This is because in a highly concurrent system, the database must be designed such that the majority of queries execute extremely quickly. If you end up with thousands of concurrent queries, it means that requests are hitting the database server faster than it can process them. The thread pool is then the least of your worries!

The primary use for asynchronous methods is in dealing with potentially slow network connections.

Asynchronous Method Signatures

Asynchronous methods, by convention, all start with "Begin", have a pairing method starting with "End", and have signatures like those of asynchronous delegates:

IAsyncResult BeginXXX (in/ref-args, AsyncCallback callback, object state);
return-type EndXXX (out/ref-args, IAsyncResult asyncResult);

Here's an example from NetworkStream:

public IAsyncResult BeginRead (byte[] buffer, int offset, int size,
                               AsyncCallback callback, object state);
public int EndRead (IAsyncResult asyncResult);

The Begin method returns an IAsyncResult object:

public interface IAsyncResult
{
  object AsyncState { get; }            // "state" object passed to Begin.
  WaitHandle AsyncWaitHandle { get; }   // Allows caller to wait it out.
  bool CompletedSynchronously { get; }  // Did it complete on BeginX?
  bool IsCompleted { get; }             // Has it completed yet?
}

This same IAsyncResult object is passed to the completion callback. Here's its delegate:

public delegate void AsyncCallback (IAsyncResult ar);

As with asynchronous delegates, the EndXXX method allows the return value to be retrieved, as well as any out/ref arguments. This is also where exceptions are rethrown.

Tip

If you fail to call the EndXXX method, exceptions won't get rethrown, meaning silent failure.

To avoid blocking, you will nearly always call the EndXXX method from inside the callback method. Callbacks always run on pooled threads.

Asynchronous Methods Versus Asynchronous Delegates

Asynchronous method signatures look exactly like those of asynchronous delegates. Their behavior, however, is very different:

Asynchronous methods

Asynchronous delegates

Rarely or never blocks any thread.

May block for any length of time

Begin method may not return immediately to the caller.

BeginInvoke returns immediately to the caller.

An agreed protocol with no C# language support.

Built-in compiler support

The purpose of asynchronous methods is to allow many tasks to run on few threads; the purpose of asynchronous delegates is to execute a task in parallel with the caller.

You can use an asynchronous delegate to call an asynchronous method-so that execution is guaranteed to return immediately to the caller, while still following the nonblocking asynchronous method model. If you use an asynchronous delegate to call a blocking method, however, you're back to square one: the server will either suffer limited concurrency or need thousands of threads to do its job.

Using Asynchronous Methods

Let's write a simple TCP sockets server that behaves as follows:

  1. It waits for a client request.

  2. It reads a 5,000-byte fixed-length message.

  3. It reverses the bytes in the message, and then returns them to the client.

Let's first write this using a standard multithreaded blocking pattern. Here is the code, exception handling aside:

using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;

public class Server
{
 public void Serve (IPAddress address, int port)
 {
    ThreadPool.SetMinThreads (50, 50);    // Refer to Chapter 19
    TcpListener listener = new TcpListener (address, port);
    listener.Start(  );
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient(  );
      ThreadPool.QueueUserWorkItem (Accept, c);
    }
  }

  void Accept (object clientObject)
  {
    using (TcpClient client = (TcpClient) clientObject)
    using (NetworkStream n = client.GetStream(  ))
    {
      byte[] data = new byte [5000];

      int bytesRead = 0; int chunkSize = 1;
      while (bytesRead < data.Length & chunkSize > 0)
        bytesRead +=chunkSize = n.Read
            (data, bytesRead, data.Length - bytesRead);    // BLOCKS

      Array.Reverse (data);
      n.Write (data, 0, data.Length);                      // BLOCKS
    }
  }
}

Tip

You can download a Visual Studio project containing all the code in this chapter-along with a client test harness-at http://www.albahari.com/nutshell/async.zip.

Our use of the thread pool prevents an arbitrarily large number of threads from being created (possibly taking down the server) and eliminates the time wasted in creating a new thread per request. Our program is simple and fast, but it is limited to 50 concurrent requests.

In order to scale to 1,000 concurrent requests-without increasing the thread count-we must employ the asynchronous method pattern. This means avoiding the blocking I/O methods altogether and instead calling their asynchronous counterparts. Here's how to do it:

class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start(  );
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient(  );
      ThreadPool.QueueUserWorkItem (ReverseEcho, c);
    }
  }

  void ReverseEcho (object client)
  {
    new ReverseEcho(  ).Begin ((TcpClient)client);
  }
}

class ReverseEcho
{
  volatile TcpClient     _client;
  volatile NetworkStream _stream;
  byte[]                 _data = new byte [5000];
  volatile int           _bytesRead = 0;

  internal void Begin (TcpClient c)
  {
    try
    {
      _client = c;
      _stream = c.GetStream(  );
      Read(  );
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  void Read(  )            // Read in a nonblocking fashion.
  {_stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead,
                       ReadCallback, null);
  }

  void ReadCallback (IAsyncResult r)
  {
    try
    {
      int chunkSize = _stream.EndRead (r);
      _bytesRead += chunkSize;
      if (chunkSize > 0 & _bytesRead < _data.Length)
      {
        Read(  );       // More data to read!
        return;
      }
      Array.Reverse (_data);
_stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  void WriteCallback (IAsyncResult r)
  {
  try { _stream.EndWrite (r); }
    catch (Exception ex) { ProcessException (ex); }
    Cleanup(  );
  }

  void ProcessException (Exception ex)
  {
    Cleanup(  );
    Console.WriteLine ("Error: " + ex.Message);
  }

  void Cleanup(  )
  {
    if (_stream != null) _stream.Close(  );
    if (_client != null) _client.Close(  );
  }
}

On a 3 GHz processor, this program handles 1,000 concurrent requests on fewer than 10 pooled threads (and in less than one second).

Each client request is processed without calling any blocking methods. The one exception is in the Serve method: listener.AcceptTcpClient blocks while there are no pending client requests. One blocked thread doesn't hurt performance and is also unavoidable (if we instead called BeginAcceptTcpClient the while loop would spin, rapidly enqueuing millions of tasks).

The ReverseEcho class encapsulates a request's state for its lifetime. We can no longer use local variables for this job, because the execution stack disappears each time we exit (after each call to an asynchronous method). This also complicates cleanup and means that a simple using statement is no longer suitable for closing our TcpClient and stream.

Another complicating factor is that we can't use types such as BinaryReader and BinaryWriter, because they don't offer asynchronous versions of their methods. The asynchronous pattern often forces you to work at a lower level than you might otherwise.

Writing Asynchronous Methods

Returning to our previous example, suppose that the 5,000-byte exchange was just a small part of a more sophisticated communication protocol. It would be nice to turn what we've already written into a method like this:

public byte[] ReverseEcho (TcpClient client);

The problem, of course, is that this method signature is synchronous; we need to offer an asynchronous version-in other words, BeginReverseEcho. Further, if an exception is encountered, it's no good writing it to the Console; we need to throw it back to the consumer at some point. So, to usefully partake in the pattern, we must also offer EndReverseEcho and write a class that implements IAsyncResult.

Our ReverseEcho class is an excellent candidate for IAsyncResult, since it already encapsulates the operation's state. All we really need to add is some plumbing code to rethrow any exception upon calling EndReverseEcho, and a wait handle to signal at completion.

Tip

There's no significantly shorter way to achieve this, other than cutting out the whitespace. Asynchronous methods are hard work!

Here's a real-world example, complete with exception handling and thread safety:

// This sample can be downloaded at http://www.albahari.com/nutshell/

public class MessagingServices
{
  public static IAsyncResultBeginReverseEcho (TcpClient client,
                                               AsyncCallback callback,
                                               object userState)
  {
    var re = new ReverseEcho(  );
    re.Begin (client, callback, userState);
    return re;
  }

  public static byte[] EndReverseEcho (IAsyncResult r)
  {
    return ((ReverseEcho)r).End(  );
  }
}

class ReverseEcho : IAsyncResult
{
  volatile TcpClient     _client;
  volatile NetworkStream _stream;
  volatile object        _userState;
  volatile AsyncCallback _callback;
  ManualResetEvent       _waitHandle = new ManualResetEvent (false);
  volatile int           _bytesRead = 0;
  byte[]                 _data = new byte [5000];
  volatile Exception     _exception;

  internal ReverseEcho(  ) { }

  // IAsyncResult members:

  public object AsyncState           { get { return _userState;  } }
  public WaitHandle AsyncWaitHandle  { get { return _waitHandle; } }
  public bool CompletedSynchronously { get { return false;       } }
  public bool IsCompleted
  {
   get { return _waitHandle.WaitOne (0, false); }
  }

  internal void Begin (TcpClient c, AsyncCallback callback, object state)
  {
    _client = c;
    _callback = callback;
    _userState = state;
    try
    {
      _stream = _client.GetStream(  );
      Read(  );
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  internal byte[] End(  )     // Wait for completion + rethrow any error.
  {
    AsyncWaitHandle.WaitOne(  );
    AsyncWaitHandle.Close(  );
    if (_exception != null) throw _exception;
    return _data;
  }

  void Read(  )   // This is always called from an exception-handled method
  {
    _stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead,
                       ReadCallback, null);
  }

  void ReadCallback (IAsyncResult r)
  {
    try
    {
      int chunkSize = _stream.EndRead (r);
      _bytesRead += chunkSize;
      if (chunkSize > 0 & _bytesRead < _data.Length)
      {
        Read(  );       // More data to read!
        return;
      }
      Array.Reverse (_data);
      _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  void WriteCallback (IAsyncResult r)
  {
    try { _stream.EndWrite (r); }
    catch (Exception ex) { ProcessException (ex); return; }
    Cleanup(  );
  }

  void ProcessException (Exception ex)
  {
    _exception = ex;   // This exception will get rethrown when
    Cleanup();         // the consumer calls the End(  ) method.
  }

  void Cleanup(  )
  {
    try
    {
      if (_stream != null) _stream.Close(  );
    }
    catch (Exception ex)
    {
      if (_exception != null) _exception = ex;
    }
    // Signal that we're done and fire the callback.
    _waitHandle.Set(  );
    if (_callback != null) _callback (this);
  }
}

In Cleanup, we closed _stream but not _client, because the caller may want to continue using _client after performing the reverse echo.

Warning

When writing asynchronous methods, you must meticulously catch all exceptions, saving the exception object so that it can be rethrown when the consumer calls the EndXXX method.

Fake Asynchronous Methods

In general, any Framework method that starts in "Begin" and that returns an IASyncResult follows the asynchronous pattern. There are, however, some exceptions, based on the Stream class:

BufferedStream
CryptoStream
DeflateStream
MemoryStream

These types rely on fallback asynchronous implementations in the base Stream class, which offer no nonblocking guarantees. Instead, they use an asynchronous delegate to call a blocking method such as Read or Write. Although this approach is perfectly valid in the case of MemoryStream (it never blocks in the first place, so it is excused), it creates a problem with BufferedStream and CryptoStream-if wrapping anything other than a MemoryStream. In other words, if you call BeginRead or BeginWrite on a CryptoStream that wraps a NetworkStream, some thread is going to block at some point, violating the scalability of the asynchronous method pattern. This is a shame, because the CryptoStream's decorator architecture is otherwise efficient.

A workaround with CryptoStream is to first read the underlying stream asynchronously into a MemoryStream, and then have the CryptoStream wrap the MemoryStream. This means reading the whole stream into memory, though, which on a highly concurrent server is not great for scalability, either. If you really need an asynchronous encryption, a solution is to work at a lower level than CryptoStream-that is, ICryptoTransform. You can see exactly how CryptoStream uses ICryptoTransform to do its work with a disassembly tool such as Lutz Roeder's Reflector.

DeflateStream does actually follow the asynchronous pattern-or at least tries to. The problem is that it doesn't properly handle exceptions. If the underlying stream is corrupt, for instance, BeginRead throws an exception on a pooled thread rather than marshaling it back to EndRead. This is an uncatchable exception that takes down your whole application.

The FileStream class is another violator-it touts fake asynchronous methods (i.e., it relies on Stream's default implementation). However, it does make an attempt at true asynchronous behavior if constructed as follows:

Stream s = new FileStream ("large.bin", FileMode.Create, FileAccess.Write,
                            FileShare.None, 0×1000,true);

The boolean argument at the end instructs FileStream not to use asynchronous delegates-and instead (to attempt) a nonblocking approach. The problem is that asynchronous file I/O requires operating system support, which may not be forthcoming. If the OS fails to oblige, BeginRead blocks the calling thread in a WaitSleepJoin state.

Lack of asynchronous file I/O is rarely a problem, though, assuming you're accessing a local filesystem (in fact, it can be a good idea not to use asynchronous file I/O at all). Small file requests are likely to be served from an operating system or hard drive cache and so be brief and CPU-bound; large file I/O requests are going to seriously limit concurrency if not broken up or throttled in some way, no matter what the threading model. A more insidious blocking issue arises if you're using a FileStream on a UNC network path: the solution is instead to use lower-level networking classes (such as those described) for communicating between computers.

Alternatives to Asynchronous Methods

Chapter 19, Threading described three analogous techniques-all of which coalesce many tasks onto a few threads:

  • ThreadPool.RegisterWaitForSingleObject

  • The producer/consumer queue

  • The threading and system timers

ThreadPool.RegisterWaitForSingleObject can be helpful in implementing the asynchronous method pattern. A custom producer/consumer queue can provide a complete alternative-with your own pool of workers-but is of no help if you want to interoperate with the .NET Framework (e.g., to read from a NetworkStream). The threading and system timers are excellent if your work is executed in a periodic fashion, rather than in response to requests.

Asynchronous Events

The asynchronous event pattern is quite separate from both asynchronous methods and delegates and was introduced in .NET 2.0, possibly as an attempt to simplify, or "dumb down," asynchronous methods. Unfortunately, it suffers an unclear design purpose, vacillating on whether to follow the semantics of asynchronous methods (nonblocking) or asynchronous delegates (parallel execution).

Asynchronous events offer automatic context marshaling for Windows Forms and other context-dependent applications, and a protocol for cancellation and progress reporting. They are used in just a few places in the .NET Framework; one example is the WebClient façade class (look for the method names ending in "Async"). Calling DownloadFileAsync to download a small web page over a slow connection blocks the caller for around two seconds-most of the duration of the retrieval. Given that it doesn't take two seconds of CPU time to retrieve a small web page, this behavior is incompatible with both nonblocking and parallel execution semantics, making it useless in practice as well as theory!

An excellent replacement for simple parallel execution is the BackgroundWorker class, described in Chapter 19, Threading. This offers the same set of features as the asynchronous event model, but with a clear purpose (parallel execution). It can be instantiated directly (equivalent, perhaps, to consuming asynchronous events) or subclassed (equivalent to implementing the asynchronous event pattern).