January 2009

Volume 24 Number 01

Net Matters - Round-Robin Access To The ThreadPool

By Stephen Toub | January 2009

QI'm currently using the Microsoft .NET Framework ThreadPool, and I've run into a situation I'm not sure how to solve. I start off with a large batch of work items that get queued, and then a second (smaller batch) arrives after the first batch starts its processing. Initially, some of the work in the large batch will be dispatched to all of the worker threads in the ThreadPool. However, when the second batch arrives, I want the distribution to be fair, with each batch getting serviced equally, rather than the first batch getting full attention due to it arriving first.

When one of the batches finishes, I'd like whichever one still needs processing to then get the attention of all the worker threads. Is there anything I can do to layer such batching functionality on top of the ThreadPool?

AIn past columns, I've showed how to layer various types of functionality on top of the existing .NET ThreadPool. In the October 2004 issue of MSDN Magazine, I showed how to add support onto the ThreadPool for waiting on queued work items (see " ThreadPoolWait and HandleLeakTracker"). In the November 2004 issue, I showed how to add support for work item priorities (see " ThreadPoolPriority, and MethodImplAttribute"). And in the March 2006 issue, I showed how to add cancellation support (see " Abortable Thread Pool"). In the future, I'll be able to look back at the January 2009 issue and say that there I showed how to add round-robin scheduling support on top of the ThreadPool.

The problem you're looking to solve first requires an understanding of how the ThreadPool dispatches work. Internally, it maintains a queue of the work items that have been queued to it. When a thread in the pool is available to execute work, it goes back to the work queue and pulls off the next item. The order in which this processing happens is undocumented and should most definitely not be relied upon (as it could and likely will change in future versions).

Today it's implemented in a very simple manner: a first-in-first-out (FIFO) queue. Thus, the first work to be queued will be the first work to be picked up by a thread. In your batching scenario, this means all of the work from the first batch will be in the queue in front of all of the work from the second batch. And thus all of the first batch's work will be dispatched before that of the second batch. For some scenarios, this is fine and optimal. For your scenario, you need more control.

One of the easiest ways to get that control from the ThreadPool is to substitute your own delegate for the one the user really wants executed. As an example, let's say you wanted to catch all unhandled exceptions thrown out of queued work and raise an event for each. To do that, you could write code such as that in Figure 1. Then, instead of using ThreadPool.QueueUserWorkItem, you'd use ExceptionThreadPool.QueueUserWorkItem. The work would still be executed by the ThreadPool, but the pool's threads would actually be executing the delegate you queued rather than the one the user provided. Invoking your delegate would then invoke the user-provided delegate, catching any exceptions and raising the target event.

Figure 1 Shimming the ThreadPool

public static class ExceptionThreadPool { public static void QueueUserWorkItem( WaitCallback callback, object state) { ThreadPool.QueueUserWorkItem(delegate { try { callback(state); } catch (Exception exc) { var handler = UnhandledException; if (handler != null) handler(null, new UnhandledExceptionEventArgs(exc, false)); } }); } public static event UnhandledExceptionEventHandler UnhandledException; }

Note that this technique, while powerful, does come at a cost: an extra delegate needs to be allocated, an extra delegate needs to be invoked, and so forth. Whether that cost is prohibitive can be determined only by you and your scenarios, but this kind of layering is typically more cost-effective than writing your own thread pool from scratch.

This is, of course, a very simple example, but you can do more complicated things. The priority pool example to which I referred earlier stores the user-provided delegates in its own data structures. It then queues to the pool a substitute delegate that comes back and searches those data structures for the right delegate to execute, preferring to execute first those that have higher priorities. You can employ a similar technique to address your batching predicament.

Take a moment to imagine that rather than having a single queue, you had one queue per batch. Each batch would put work into its own relevant queue. You'd then use the pool's threads to round-robin between all of the queues. The delegate you queue to the real ThreadPool comes back into your data structures, and starting with the next queue to be examined, looks for work. If it finds some, it executes from that queue. If not, it goes on to looking at the next queue.

In this manner, you are able to provide fair scheduling between the batches. If there is only one batch, the threads are going to always pull work from that one queue. If there are multiple batches, the threads will visit each of the queues, giving them approximately equal attention. Figure 2provides you with an overview of what this solution will look like.

Figure 2 RoundRobinThreadPool Approach

To get things going, you first need a data structure to store the user-provided delegate. My representation is shown in Figure 3. This data structure contains three properties. The first two should look familiar; they are the state the user provides to QueueUserWorkItem, so of course you need to cache that away. The third property might be unfamiliar, however. Associated with every thread of execution in .NET is a System.Threading.ExecutionContext that represents information such as the current user, any state associated with the logical thread of execution, code-access security information, and so forth. It's important that this context flow across asynchronous points of execution.

Figure 3 Capturing a Work Item

internal class WorkItem { public WaitCallback WaitCallback; public object State; public ExecutionContext Context; private static ContextCallback _contextCallback = state => { var item = (WorkItem)state; item.WaitCallback(item.State); }; public void Execute() { if (Context != null) ExecutionContext.Run(Context, _contextCallback, this); else WaitCallback(State); } }

For example, if you're impersonating a specific Windows identity and you call ThreadPool.QueueUserWorkItem, the work you queued should execute under that same Windows identity. If it doesn't, that's a potential security hole. Starting with the .NET Framework 2.0, this context flows automatically by default across all async points in your code: ThreadPool.QueueUserWorkItem, creating a new Thread, asynchronous delegate invocation, and so on.

However, you're playing by a different set of rules with the implementation being discussed here. By changing the order in which delegates are queued from the order in which they're executed, there's no longer a direct correspondence between this ExecutionContext flow and the work items the user provided. As such, your implementation needs to correctly cache the ExecutionContext with the user-provided delegate and then use that captured context to run that delegate.

Now that you have a work item, let's take a look at the queue in which it will be held (shown in Figure 4). The RoundRobinThreadPool.Queue data structure itself is fairly simple. Internally, it contains a Queue<WorkItem> to store all of the work items provided to it, a reference to the Round­RobinThreadPool instance with which the queue is associated, and a Boolean value that denotes whether the Dispose method has been called on the queue. It then provides QueueUserWorkItem methods with the same signature as the ThreadPool's.

Figure 4 A Round-Robin Queue

public sealed class RoundRobinThreadPool { private List<Queue> _queues; ... public sealed class Queue : IDisposable { internal Queue(RoundRobinThreadPool pool) { _pool = pool; } internal Queue<WorkItem> _workItems = new Queue<WorkItem>(); private RoundRobinThreadPool _pool; internal bool _disposed; public void QueueUserWorkItem(WaitCallback callback) { QueueUserWorkItem(callback, null); } public void QueueUserWorkItem(WaitCallback callback, object state) { if (_disposed) throw new ObjectDisposedException(GetType().Name); var item = new WorkItem { Context = ExecutionContext.Capture(), WaitCallback = callback, State = state }; lock (_pool._queues) _workItems.Enqueue(item); _pool.NotifyNewWorkItem(); } public void Dispose() { if (!_disposed) { lock (_pool._queues) { if (_workItems.Count == 0) _pool.RemoveQueueNeedsLock(this); } _disposed = true; } } } }

When QueueUserWorkItem is called, the callback and state provided by the user (along with the current ExecutionContext) are captured into a WorkItem. This work is then stored into the generic queue, and the relevant pool is notified that new work has arrived. It is important for you to note that a lock is used to protect the work item queue, as QueueUserWorkItem may be called concurrently from multiple threads, and you need to ensure that invariants are held.

Note, too, that the object being locked is a global queues list coming from the pool. I'm using one fairly course-grained lock for the whole implementation. A more efficient implementation would likely use more fine-grained locking, such as using individual locks per queue rather than one for the whole all-up RoundRobinThreadPool. For ease of implementation and simplicity I've opted for the single lock.

The Dispose method is used when this queue is no longer needed. In a typical batching scenario, a queue will be created, work queued to it, and then the queue will be disposed. If the Dispose method were to simply remove this queue from the pool, it's likely that it would be removed while there were still work items in it to be processed.

As such, Dispose does two things. First, it checks whether or not there are any work items remaining. If there aren't, the queue calls into the pool to remove itself. Second, it marks itself as having been disposed. You will see in a moment how the pool handles a situation where it comes across a disposed pool that has not been removed.

Figure 5shows the rest of the implementation, that of the Round­RobinThreadPool class itself. The pool contains four fields:

  • A list of the individual queues maintained by the pool (which also serves as the lock previously mentioned).
  • A default queue for the pool.
  • An integer representing the next queue to be searched for work.
  • The callback delegate that's actually queued to the ThreadPool.

Figure 5 RoundRobinThreadPool

public sealed class RoundRobinThreadPool { private List<Queue> _queues; private Queue _defaultQueue; private int _nextQueue; private WaitCallback _callback; public RoundRobinThreadPool() { _queues = new List<Queue>(); _callback = DequeueAndExecuteWorkItem; _nextQueue = 0; _defaultQueue = CreateQueue(); } public Queue CreateQueue() { var createdQueue = new Queue(this); lock (_queues) _queues.Add(createdQueue); return createdQueue; } public void QueueUserWorkItem(WaitCallback callback) { QueueUserWorkItem(callback, null); } public void QueueUserWorkItem(WaitCallback callback, object state) { _defaultQueue.QueueUserWorkItem(callback, state); } private void RemoveQueueNeedsLock(Queue queue) { int index = _queues.IndexOf(queue); if (_nextQueue >= index) _nextQueue--; _queues.RemoveAt(index); } private void NotifyNewWorkItem() { ThreadPool.UnsafeQueueUserWorkItem(_callback, null); } private void DequeueAndExecuteWorkItem(object ignored) { WorkItem item = null; lock (_queues) { var searchOrder = Enumerable.Range(_nextQueue, _queues.Count - _nextQueue). Concat(Enumerable.Range(0, _nextQueue)); foreach (int i in searchOrder) { var items = _queues[i]._workItems; if (items.Count > 0) { item = items.Dequeue(); _nextQueue = i; if (_queues[i]._disposed && items.Count == 0) RemoveQueueNeedsLock(_queues[i]); break; } } _nextQueue = (_nextQueue + 1) % _queues.Count; } if (item != null) item.Execute(); } ... // RoundRobinThreadPool.Queue and .WorkItem, already shown }

When a RoundRobinThreadPool is initialized, all of this state is configured. In particular, the default queue is initialized by calling the CreateQueue method. This CreateQueue method is the same method that's exposed publicly to allow a developer to add another queue to the pool (such as when a new batch of work arrives that needs its own isolated queue). It simply instantiates a new Round­RobinThreadPool.Queue instance (the type explored in Figure 3), adds it to the list of queues, and returns it.

For ease of use, RoundRobinThreadPool exposes its own QueueUserWorkItem methods; these simply target the default queue that was created when the pool was instantiated.

Next to examine is the NotifyNewWorkItem method. You'll remember that when QueueUserWorkItem was called on a queue, after storing the work item, the queue called to this NotifyNewWorkItem method on the pool. This method simply delegates to the real ThreadPool, submitting a delegate that will come back to the DequeueAndExecuteWorkItem method (to be examined shortly) that will, as its name aptly implies, dequeue and execute a work item from the round-robin pool.

Notice that NotifyNewWorkItem is calling ThreadPool.UnsafeQueueUserWorkItem rather than ThreadPool.QueueUserWorkItem. The "Unsafe" prefix simply implies that it's not flowing ExecutionContext; not doing so has a performance benefit. And as this implementation is already handling ExecutionContext flow manually, there is no need for the ThreadPool to attempt to do so as well.

DequeueAndExecuteWorkItem is where the real magic happens. This method first generates an order in which to search the queues. The search order goes from the next queue to be examined up to the end of the list and then circles back around, starting at the beginning of the list and going up to the queue at which it started. To simplify the implementation, the LINQ Enumerable.Range method is being used to generate the two lists, which are then concatenated together using the LINQ Enumerable.Concat method.

Once it has the search order, it goes hunting for work items. Each queue is examined in the specified order, and as soon as a work item is found, the work item is removed and the next pointer is updated. The work item is then invoked using the Execute method shown in Figure 3.

There's one particularly interesting line of code here, and that's the check to see whether the queue from which an item was just retrieved is both disposed and empty. If the pool finds such a queue, it knows that it won't ever have more items added to it (as it's been disposed) and, thus, doesn't need to keep it around anymore. At that point, RemoveQueueNeedsLock is used to remove the target queue from the queues list and potentially update the next queue pointer in case it's now out of range.

Note that this method does not use a lock internally but accesses shared state; hence I've named the method with a "NeedsLock" suffix to remind myself that it needs to be called while the lock is held. You'll notice that both call sites to RemoveQueueNeedLock—one in the queue's Dispose method and one in the pool's DequeueAndExecuteWorkItem method—call this while holding the queues lock.

With the implementation complete, you can now test this in your code. In the following example, I've created a single static instance of the RoundRobinThreadPool. When a batch of work arrives to be processed, a new queue is created, all of the work is queued to this queue, and the queue is disposed:

private static RoundRobinThreadPool _pool = new RoundRobinThreadPool(); ... private void ProcessBatch(Batch b) { using(var queue = _pool.CreateQueue()) { foreach(var item in b) { queue.QueueUserWorkItem(() => ProcessItem(item)); } } }

Even though all of the work for the first batch to arrive will be scheduled first, as soon as the second batch arrives, it will start getting an approximately equal share of the processing resources.

There are more bells and whistles that could be added to this implementation, and the implementation could likely be improved from a performance perspective. However, with very little code, you've been able to create an abstraction that takes advantage of the .NET ThreadPool and all of the capabilities and functionality it provides, yet still get support for the batch-based fairness you were seeking.

Send your questions and comments for Stephen to netqa@microsoft.com.

Stephen Toub is a Senior Program Manager on the Parallel Computing Platform team at Microsoft. He is also a Contributing Editor for MSDN Magazine.