How to: Synchronize a Producer and a Consumer Thread (C# and Visual Basic)

[This documentation is for preview only, and is subject to change in later releases. Blank topics are included as placeholders.]

The following example demonstrates thread synchronization between the primary thread and two worker threads by using the lock (C#) or SyncLock (Visual Basic) keyword, and the AutoResetEvent and ManualResetEvent classes. For more information, see lock Statement (C# Reference) or SyncLock Statement.

The example creates two auxiliary, or worker threads. One thread produces elements and stores them in a generic queue that is not thread-safe. For more information, see Queue<T>. The other thread consumes items from this queue. In addition, the primary thread periodically displays the contents of the queue, so that the queue is accessed by three threads. The lock or SyncLock keyword is used to synchronize access to the queue to ensure that the state of the queue is not corrupted.

In addition to simply preventing simultaneous access that uses the lock or SyncLock keyword, additional synchronization is provided by two event objects. One is used to signal the worker threads to terminate, and the other is used by the producer thread to signal to the consumer thread when a new item has been added to the queue. These two event objects are encapsulated in a class named SyncEvents. This lets the events pass to the objects that represent the consumer and producer threads easily. The SyncEvents class is defined as follows:


public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}


The AutoResetEvent class is used for the "new item" event because you want this event to reset automatically every time that the consumer thread responds to this event. Alternatively, the ManualResetEvent class is used for the "exit" event because you want multiple threads to respond when this event is signaled. If you used AutoResetEvent instead, the event would revert to a non-signaled state after just one thread responded to the event. The other thread would not respond, and in this case, would fail to terminate.

The SyncEvents class creates the two events, and stores them in two different forms: as EventWaitHandle, which is the base class for both AutoResetEvent and ManualResetEvent, and in an array based on WaitHandle. As you will see in the consumer thread discussion, this array is necessary so that the consumer thread can respond to either event.

The consumer and producer threads are represented by classes named Consumer and Producer. Both of these define a method called ThreadRun. These methods are used as the entry points for the worker threads that the Main method creates.

The ThreadRun method defined by the Producer class resembles this:


// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}


This method loops until the "exit thread" event becomes signaled. The state of this event is tested with the WaitOne method, by using the ExitThreadEvent property defined by the SyncEvents class. In this case, the state of the event is checked without blocking the current thread because the first argument used with WaitOne is zero that indicates that the method should return immediately. If WaitOne returns true, then the event in question is currently signaled. If so, the ThreadRun method returns, which has the effect of terminating the worker thread executing this method.

Until the "exit thread" event is signaled, the Producer.ThreadStart method tries to keep 20 items in the queue. An item is just an integer between zero and 100. The collection must be locked before you add new items to prevent the consumer and primary threads from accessing the collection at the same time. This is done by using the lock (C#) or SyncLock (Visual Basic) keyword. The argument passed to lock or SyncLock is the SyncRoot field exposed through the ICollection interface. This field is provided specifically for synchronizing thread access. Exclusive access to the collection is granted for any instructions that are contained in the code block following lock or SyncLock. For each new item that the producer adds to the queue, a call to the Set method on the "new item" event is made. This signals the consumer thread to emerge from its suspended state to process the new item.

The Consumer object also defines a method called ThreadRun. Like the producer's version of ThreadRun, this method is executed by a worker thread created by the Main method. However, the consumer version of ThreadRun must respond to two events. The Consumer.ThreadRun method resembles this:


// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}


This method uses WaitAny to block the consumer thread until any of the wait handles in the provided array become signaled. In this case, there are two handles in the array, one that terminates the worker threads, and one that indicates that a new item has been added to the collection. WaitAny returns the index of the event that became signaled. The "new item" event is the first in the array, so that an index of zero indicates a new item. In this case, check for an index of 1, which indicates the "exit thread" event, and this is used to determine whether this method continues to consume items. If the "new item" event was signaled, you get exclusive access to the collection with lock or SyncLock, and can consume the new item. Because this example produces and consumes thousands of items, you do not display each item consumed. Instead use Main to periodically display the contents of the queue, as will be demonstrated.

The Main method starts by creating the queue whose contents will be produced and consumed and an instance of SyncEvents, which you looked at earlier:


Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();


Next, Main configures the Producer and Consumer objects for use with worker threads. This step does not, however, create or start the actual worker threads:


Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);


Notice that the queue and the synchronization event object are passed to both the Consumer and Producer threads as constructor arguments. This provides both objects that have the shared resources that they need to perform their respective tasks. Two new Thread objects are then created, by using the ThreadRun method for each object as an argument. Each worker thread, when it is started, will use this argument as the entry point for the thread.

Next Main launches the two worker threads with a call to the Start method, such as this:


producerThread.Start();
consumerThread.Start();


At this point, the two new worker threads are created and begin asynchronous execution, independent of the primary thread that is currently executing the Main method. In fact, the next thing Main does is suspend the primary thread with a call to the Sleep method. The method suspends the currently executing thread for a given number of milliseconds. Once this interval elapses, Main is reactivated, at which point it displays the contents of the queue. Main repeats this for four iterations, such as this:


for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}


Finally, Main signals the worker threads to terminate by invoking the Set method of the "exit thread" event, and then calls the Join method on each worker thread to block the primary thread until each worker thread respond to the event and terminates.

There is one final example of thread synchronization: the ShowQueueContents method. This method, like the consumer and producer threads, uses lock (C#) or SyncLock (Visual Basic) to gain exclusive access to the queue. In this case, however, exclusive access is very important, because ShowQueueContents enumerates over all of the collection. To enumerate over a collection is an operation that is especially prone to data corruption by asynchronous operations because it involves traversing the contents of the entire collection.

Notice that ShowQueueContents, because it is called by Main, is executed by the primary thread. This means that this method, when it achieves exclusive access to the item queue, blocks both the producer and consumer threads from access to the queue. ShowQueueContents locks the queue and enumerates the contents:


private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}


The complete example follows.


using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}


Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

Show: