June 2016

Volume 31 Number 6

[Reactive Framework]

Scale Asynchronous Client-Server Links with Reactive

By Peter Vogel | June 2016

As asynchronous processing has become more common in application development, the Microsoft .NET Framework has acquired a wide variety of tools that support specific asynchronous design patterns. Often creating a well-designed asynchronous application comes down to recognizing the design pattern your application is implementing and then picking the right set of .NET components.

In some cases, the match requires integrating several .NET components. Stephen Cleary’s article, “Patterns for Asynchronous MVVM Applications: Commands” (bit.ly/233Kocr), shows how to fully support the Model-View-ViewModel (MVVM) pattern in an asynchronous fashion. In other cases support requires just one component from the .NET Framework. I’ve discussed implementing the provider/consumers pattern using the BlockingCollection in my VisualStudioMagazine.com Practical .NET columns, “Create Simple, Reliable Asynchronous Apps with BlockingCollection” (bit.ly/1TuOpE6), and, “Create Sophisticated Asynchronous Applications with BlockingCollection” (bit.ly/1SpYyD4).

Another example is implementing the observer design pattern to monitor a long-running operation asynchronously. In this scenario, an asynchronous method that returns a single Task object doesn’t work because the client is frequently returning a stream of results. For these scenarios, you can leverage at least two tools from the .NET Framework: the ObservableCollection and Reactive Extensions (Rx). For simple solutions, the ObservableCollection (along with the async and await keywords) is all you need. However, for more “interesting” and, especially, event-driven problems, Rx provides you with better control over the process.

Defining the Pattern

While the observer pattern is frequently used in UI design patterns—including Model-View-Controller (MVC), Model-View-Presenter (MVP) and MVVM—UIs should be considered as just one scenario from a larger set of scenarios where the observer pattern applies. The definition of the observer pattern (quoting from Wikipedia) is: “An object, called the subject, [that] maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.”

Really, the observer pattern is about getting results from long-running processes to the client as soon as those results are available. Without some version of the observer pattern, clients must wait until the last result is available and then have all the results sent to them in a single lump. In an increasingly asynchronous world, you want the observers to process results in parallel with the client as the results become available. To emphasize that you’re talking about more than UIs when leveraging the observer pattern, I’ll use “client” and “server” instead of “observer” and “subject,” in the rest of this article.

Problems and Opportunities

There are at least three issues and two opportunities with the observer pattern. The first issue is the lapsed-listener problem: Many implementations of the observer pattern require the server to hold a reference to all of its clients. As a result, clients can be held in memory by the server until the server exits. This obviously isn’t an optimal solution for a long-running process in a dynamic system where clients connect and disconnect frequently.

The lapsed-listener problem, however, is just a symptom of the second, larger problem: Many implementations of the observer pattern require the server and the client to be tightly coupled, requiring both the server and the client to be present at all times. At the very least, the client should be able to determine if the server is present and choose not to attach; in addition, the server should be able to function even if there are no clients accepting results.

The third issue is performance-related: How long will it take for the server to notify all clients? Performance in the observer pattern is directly affected by the number of clients to be notified. Therefore, there is an opportunity to improve performance in the observer pattern by letting the client preemptively filter the results that come back from the server. This also addresses the scenarios where the server produces more results (or a wider variety of results) than the client is interested in: The client can indicate that it’s only to be notified in specific cases. The second performance opportunity exists around recognizing when the server has no results or has finished producing results. Clients can skip acquiring resources required to process server events until the client is guaranteed there is something to process and clients can release those resources as soon as they know they’ve processed the last result.

From Observer to Publish/Subscribe

Factoring in these considerations leads from simple implementations of the observer pattern to the related publish/subscribe model. Publish/subscribe implements the observer pattern in a loosely coupled way that lets servers and clients execute even if the other is currently unavailable. Publish/subscribe also typically implements client-side filtering by letting the client subscribe either to specific topics/channels (“Notify me about purchase orders”) or to attributes associated with different kinds of content (“Notify me about any urgent requests”).

One issue remains, however.  All implementations of the observer pattern tend to tightly couple clients and servers to a specific message format. Changing the format of a message in most publish/subscribe implementations can be difficult because all of the clients must be updated to use the new format.

In many ways, this is similar to the description of a server-side cursor in a database. To minimize transmission costs, the database server doesn’t return results as each row is retrieved. However, for large row sets, the database also doesn’t return all the rows in a single batch at the end. Instead, the database server typically returns subsets from a cursor held on the server often as those subsets become available. With a database, the client and the server don’t have to be simultaneously present: The database server can run when there are no clients present; a client can check to see if the server is accessible and, if not, decide what (if anything) else it can do. The filtering process (SQL) is also very flexible. However, if the database engine changes the format it uses to return rows, then all clients must, at the very least, be recompiled.

Processing a Cache of Objects

As my case study for looking at a simple observer pattern implementation, I’m using as my server a class that searches an in-memory cache of invoices. That server could, at the end of its processing, return a collection of all of the invoices. However, I’d prefer to have the client process the invoices individually and in parallel to the server’s search process. That means I prefer a version of the process, which returns each invoice as it’s found and lets the client process each invoice in parallel with the search for the next invoice.

A simple implementation of the server might look like this:

private List<Invoice> foundInvoices = new List<Invoice>();
public List<Invoice> FindInvoices(decimal Amount)
{
  foundInvoices.Clear();
  Invoice inv;
    // ...search logic to add invoices to the collection
     foundInvoices.Add(inv);
    // ...repeat until all invoices found
    return foundInvoices;
}

More sophisticated solutions might use yield return to return each invoice as it’s found rather than assembling the list. Regardless, a client that calls the FindInvoices method will want to perform some critical activities before and after processing. For example, once the first item is found, the client might want to enable a MatchingInvoices list to hold the invoices at the client or acquire/initialize any resources required to process an invoice. As additional invoices are added, the client would need to process each invoice and, when the server signals that the final invoice is retrieved, release any resources that are no longer required because there are “no more” invoices to process.

During a database retrieval, for example, a read will block until the first row is returned. Once the first row is returned, the client initializes whatever resources are needed to process a row. The read also returns false when the final row is retrieved, letting the client release those resources because there are no more rows to process.

Creating Simple Solutions with ObservableCollection

The most obvious choice for implementing the observer pattern in the .NET Framework is the ObservableCollection. The ObservableCollection will notify the client (through an event) whenever it’s changed.

Rewriting my sample server to use the ObservableCollection class requires only two changes. First, the collection holding the results needs to be defined as an ObservableCollection and made public. Second, it’s no longer necessary for the method to return a result: The server only needs to add invoices to the collection.

The new implementation of the server might look like this:

public List<Invoice> FindInvoices(decimal Amount)
{
  public ObservableCollection<Invoice> foundInvoices =
    new ObservableCollection<Invoice>();
  public void FindInvoices(decimal Amount)
  {
    foundInvoices.Clear();
    Invoice inv;
    // ...search logic to set inv
     foundInvoices.Add(inv);
    // ...repeat until all invoices are added to the collection   
  }

A client that uses this version of the server only needs to wire up an event handler to the CollectionChanged event of the InvoiceManagement’s foundInvoices collection. In the following code I’ve had the class implement the IDisposable interface to support disconnecting from the event:

public class SearchInvoices: IDisposable
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public void SearchInvoices()
  {
    invMgmt.foundInvoices.CollectionChanged += InvoicesFound;
  }
  public void Dispose()
  {
    invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged;
  }

In the client, the CollectionChanged event is passed a NotifyCollectionChangedEventArgs object as its second parameter. That object’s Action property specifies both what change was performed on the collection (the actions are: the collection was cleared, new items were added to the collection, existing items were moved/replaced/removed) and information about the changed items (a collection of any added items, a collection of items present in the collection prior to the new items being added, the position of the item that was moved/removed/replaced).

Simple code in the client that would asynchronously process each invoice as it’s added to the collection in the server would look like the code in Figure 1.

Figure 1 Asynchronously Processing Invoices Using ObservableCollection

private async void InvoicesFound(object sender,
  NotifyCollectionChangedEventArgs e)
{
  switch (e.Action)
  {
    case NotifyCollectionChangedAction.Reset:
      {
        // ...initial item processing
        return;
      }
    case NotifyCollectionChangedAction.Add:
      {
        foreach (Invoice inv in e.NewItems)
        {
          await HandleInvoiceAsync(inv);
        }
        return;
      }
  }
}

While simple, this code might be inadequate for your needs, especially if you’re handling a long-running process or working in a dynamic environment. From an asynchronous design point of view, for example, the code could capture the Task object returned by the Handle­InvoiceAsync so that the client could manage the asynchronous tasks. You’ll also want to make sure that the CollectionChanged event is raised on the UI thread even if FindInvoices runs on a background thread.

Because of where the Clear method is called in the server class (just before searching for the first Invoice) the Action property’s Reset value can be used as a signal that the first item is about to be retrieved. However, of course, no invoices may be found in the search, so using the Reset Action might result in the client allocating resources that are never actually used. To actually handle “first item” processing, you’d need to add a flag to the Add Action processing to execute only when the first item was found.

In addition, the server has a limited number of options for indicating that the last Invoice is found so that the client can stop waiting for “the next one.” The server could, presumably, clear the collection after finding the last item, but that would just force more complex processing into the Reset Action processing (have I been processing Invoices? If yes, then I’ve processed the last Invoice; if no, then I’m about to process the first Invoice).

While, for simple problems, Observable­Collection will be fine, any reasonably sophisticated implementation based on ObservableCollection (and any application that values efficiency) is going to require some complicated code, especially in the client.

The Rx Solutions

If you want asynchronous processing then Rx (available through NuGet) can provide a better solution for implementing the observer pattern by borrowing from the publish/subscribe model. This solution also provides a LINQ-based filtering model, better signaling for first/last item conditions and better error handling.

Rx can also handle more interesting observer implementations than are possible with an ObservableCollection. In my case study, after returning the initial list of invoices, my server might continue to check for new invoices that are added to the cache after the original search completes (and that match the search criteria, of course). When an invoice meeting the criteria does appear, the client will want to be notified about the event so that the new invoice can be added to the list. Rx supports these kinds of event-based extensions to the observer pattern better than ObservableCollection.

There are two key interfaces in Rx for supporting the observer pattern. The first is IObservable<T>, implemented by the server and specifying a single method: Subscribe. The server implementing the Subscribe method will be passed a reference to an object from a client. To handle the lapsed listener problem, the Subscribe method returns a reference to the client for an object that implements the IDisposable interface. The client can use that object to disconnect from the server. When the client does disconnect, the server is expected to remove the client from any of its internal lists.

The second is the IObserver<T> interface, which must be implemented by the client. That interface requires the client to implement and expose three methods to the server: OnNext, OnCompleted and OnError. The critical method here is OnNext, which is used by the server to pass a message to the client (in my case study that message would be new Invoice objects that will be returned as each one appears). The server can use the client’s OnCompleted method to signal that there’s no more data. The third method, OnError, provides a way for the server to signal to the client that an exception has occurred.

You’re welcome to implement the IObserver interface yourself, of course (it’s part of the .NET Framework). Along with the ObservableCollection, that may be all you need if you’re creating a synchronous solution (I’ve written a column about that, too, “Writing Cleaner Code with Reactive Extensions” [bit.ly/10nfQtm]).

However, the Rx includes several packages that provide asynchronous implementations of these interfaces, including implementations for JavaScript and RESTful services. The Rx Subject class provides an implementation of IObservable that simplifies implementing an asynchronous publish/subscribe version of the observer pattern.

Creating an Asynchronous Solution

Creating a server to work with a Subject object requires very few changes to the original synchronous server-side code. I replace the old ObservableCollection with a Subject object that will pass each Invoice as it appears to any listening clients. I declare the Subject object as public so that clients can access it:

public class InvoiceManagement
{
  public IObservable<Invoice> foundInvoice =
    new Subject<Invoice>();

In the body of the method, instead of adding an invoice to a collection, I use the Subject’s OnNext method to pass each invoice to the client as it’s found:

public void FindInvoices(decimal Amount)
{
  inv = GetInvoicesForAmount(Amount) // Poll for invoices
  foundInvoice.OnNext(inv);
  // ...repeat...
}

In my client, I first declare an instance of the server class. Then, in a method marked as async, I call the Subject’s Subscribe method to indicate that I want to start retrieving messages:

public class InvoiceManagementTests
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public async void ProcessInvoices()
  {
    invMgmt.foundInvoice.Subscribe<Invoice>();

To filter the results to just the invoices I want, I can apply a LINQ statement to the Subject object. This example filters the invoices to the ones that are back ordered (to use Rx LINQ extensions you’ll need to add a using statement for the System.Reactive.Linq namespace):

invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();

Once I’ve started listening to the subject, I can specify what processing I want to do when I receive an invoice. I can, for example, use FirstAsync to process just the first invoice returned by the service. In this example, I use the await statement with the call to FirstAsync so that I can return control to the main body of my application while processing the invoice. This code waits to retrieve that first invoice, then moves on to whatever code I use to initialize the invoice processing process and, finally, processes the invoice:

Invoice inv;
inv = await invMgmt.foundInvoice.FirstAsync();
// ...setup code invoices...
HandleInvoiceAsync(inv);

One caveat: FirstAsync will block if the server hasn’t yet produced any results. If you want to avoid blocking, you can use FirstOrDefault­Async, which will return null if the server hasn’t produced any results. If there are no results, the client can decide what, if anything, to do.

The more typical case is that the client wants to process all the invoices returned (after filtering) and to do so asynchronously. In that case, rather than use a combination of Subscribe and OnNext, you can just use the ForEachAsync method. You can pass a method or a lambda expression that processes incoming results. If you pass a method (which can’t be asynchronous), as I do here, that method will be passed the invoice that triggered ForEachAsync:

invMgmt.foundInvoice.ForEachAsync(HandleInvoice);

The ForEachAsync method can also be passed a cancellation token to let the client signal that it’s disconnecting. A good practice would be to pass the token when calling any of Rx *Async methods to support letting the client terminate processing without having to wait for all objects to be processed.

The ForEachAsync won’t process any result already processed by a First (or FirstOrDefaultAsync) method so you can use FirstOr­DefaultAsync with ForEachAsync to check to see if the server has anything to process before processing subsequent objects. However, the Subject’s IsEmpty method will perform the same check more simply. If the client has to allocate any resources required for processing results, IsEmpty allows the client to check to see if there’s anything to do before allocating those resources (an alternative would be to allocate those resources on the first item processed in the loop). Using IsEmpty with a client that checks to see if there are any results before allocating resources (and starting processing) while also supporting cancellation would give code that looks something like Figure 2.

Figure 2 Code to Support Cancellation and Defer Processing Until Results are Ready

CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken cancel;
cancel = cancelSource.Token;
if (!await invMgmt.foundInvoice.IsEmpty())
{
  // ...setup code for processing invoices...
  try
  {
    invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel);
  }
  catch (Exception ex)
  {
    if (ex.GetType() != typeof(CancellationToken))
    {
      // ...report message
    }
   }
   // ...clean up code when all invoices are processed or client disconnects
}

Wrapping Up

If all you need is a simple implementation of the observer pattern, then ObservableCollection might do all you need to process a stream of results. For better control and for an event-based application, the Subject class and the extensions that come with Rx will let your application work in an asynchronous mode by supporting a powerful implementation of the publish/subscribe model (and I haven’t looked at the rich library of operators that come with Rx). If you’re working with Rx, it’s worthwhile to download the Rx Design Guide (bit.ly/1VOPxGS), which discusses the best practices in consuming and producing observable streams.

Rx also provides some support for converting the message type passed between the client and the server by using the ISubject<TSource, TResult> interface. The ISubject<TSource, TResult> interface specifies two datatypes: an “in” datatype and an “out” datatype. Within the Subject class that implements this interface you can perform any operations necessary to convert the result returned from the server (the “in” datatype) into the result required by the client (the “out” datatype). Furthermore, the in parameter is covariant (it will accept the specified datatype or anything the datatype inherits from) and the out parameter is contravariant (it will accept the specified datatype or anything that derives from it), giving you additional flexibility.

We live in an increasingly asynchronous world and, in that world, the observer pattern is going to become more important—it’s a useful tool for any interface between processes where the server process returns more than a single result. Fortunately, you have several options for implementing the observer pattern in the .NET Framework, including the ObservableCollection and Rx.


Peter Vogel is a systems architect and principal in PH&V Information Services. PH&V provides full-stack consulting from UX design through object modeling and database design.

Thanks to the following Microsoft technical experts for reviewing this article: Stephen Cleary, James McCaffrey and Dave Sexton
Stephen Cleary has worked with multithreading and asynchronous programming for 16 years and has used async support in the Microsoft .NET Framework since the first community technology preview. He is the author of “Concurrency in C# Cookbook” (O’Reilly Media, 2014).  His homepage, including his blog, is at stephencleary.com.


Discuss this article in the MSDN Magazine forum