如何:对制造者线程和使用者线程进行同步(C# 编程指南)

下面的示例演示使用 lock 关键字以及 AutoResetEventManualResetEvent 类对主线程和两个辅助线程进行线程同步。有关更多信息,请参见 lock 语句(C# 参考)

该示例创建两个辅助线程。一个线程生成元素并将它们存储在非线程安全的泛型队列中。有关更多信息,请参见 Queue。另一个线程使用此队列中的项。另外,主线程定期显示队列的内容,因此该队列被三个线程访问。lock 关键字用于同步对队列的访问,以确保队列的状态没有被破坏。

除了用 lock 关键字来阻止同时访问外,还用两个事件对象提供进一步的同步。一个事件对象用来通知辅助线程终止,另一个事件对象由制造者线程用来在有新项添加到队列中时通知使用者线程。这两个事件对象封装在一个名为 SyncEvents 的类中。这使事件可以轻松传递到表示制造者线程和使用者线程的对象。SyncEvents 类是按如下方式定义的:

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

AutoResetEvent 类用于“新项”事件,因为您希望每当使用者线程响应此事件后,此事件都能自动重置。或者,将 ManualResetEvent 类用于“退出”事件,因为您希望当此事件终止时有多个线程响应。如果您改为使用 AutoResetEvent,则仅在一个线程响应该事件以后,该事件就还原到非终止状态。另一个线程不会响应,因此在这种情况下,将无法终止。

SyncEvents 类创建两个事件,并将它们以两种不同的形式存储:一种是作为 EventWaitHandle(它是 AutoResetEventManualResetEvent 的基类),一种是作为基于 WaitHandle 的数组。如关于使用者线程的讨论中所述,此数组是必需的,因为它使使用者线程可以响应两个事件中的任何一个。

使用者线程和制造者线程分别由名为 ConsumerProducer 的类表示,这两个类都定义名为 ThreadRun 的方法。这些方法用作 Main 方法创建的辅助线程的入口点。

Producer 类定义的 ThreadRun 方法如下所示:

// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

此方法一直循环,直到“退出线程”事件变为终止状态。此事件的状态由 WaitOne 方法使用 SyncEvents 类定义的 ExitThreadEvent 属性测试。在这种情况下,检查该事件的状态不会阻止当前线程,因为 WaitOne 使用的第一个参数为零,这表示该方法应立即返回。如果 WaitOne 返回 true,则说明该事件当前处于终止状态。如果是这样,ThreadRun 方法将返回,其效果相当于终止执行此方法的辅助线程。

在“退出线程”事件终止前,Producer.ThreadStart 方法将尝试在队列中保持 20 个项。每个项是 0 到 100 之间的一个整数。在添加新项前,该集合必须处于锁定状态,以防止使用者线程和主线程同时访问该集合。这是使用 lock 关键字来实现的。传递给 lock 的参数是通过 ICollection 接口公开的 SyncRoot 字段。此字段专门为同步线程访问而提供。对集合的独占访问权限被授予 lock 后面的代码块中包含的所有指令。对于制造者添加到队列中的每个新项,都将调用“新项”事件的 Set 方法。这将通知使用者线程离开挂起状态并开始处理新项。

Consumer 对象还定义名为 ThreadRun 的方法。与制造者的 ThreadRun 类似,此方法由 Main 方法创建的辅助线程执行。然而,使用者的 ThreadStart 必须响应两个事件。Consumer.ThreadRun 方法如下所示:

// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

此方法使用 WaitAny 来阻止使用者线程,直到所提供的数组中的任意一个等待句柄变为终止状态。在这种情况下,数组中有两个句柄,一个用来终止辅助线程,另一个用来指示有新项添加到集合中。WaitAny 返回变为终止状态的事件的索引。“新项”事件是数组中的第一个事件,因此索引为零表示新项。在这种情况下,检查索引为 1 的项(它表示“退出线程”事件),以确定此方法是否继续使用项。如果“新项”事件处于终止状态,您将通过 lock 获得对集合的独占访问权限并使用新项。因为此示例生成并使用数千个项,所以不显示使用的每个项,而是使用 Main 定期显示队列中的内容,如下面所演示的那样。

Main 方法从创建队列(队列中的内容将被生成和使用)和 SyncEvents 的实例(已在前面演示)开始:

Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

然后,Main 配置 ProducerConsumer 对象以供辅助线程使用。然而,此步骤并不创建或启动实际的辅助线程:

Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

请注意,队列和同步事件对象作为构造函数参数同时传递给 ConsumerProducer 线程。这为两个对象提供了它们执行各自任务所需的共享资源。然后创建两个新的 Thread 对象,并对每个对象使用 ThreadRun 方法作为参数。每个辅助线程在启动时都将此参数用作线程的入口点。

然后,Main 通过调用 Start 方法来启动两个辅助线程,如下所示:

producerThread.Start();
consumerThread.Start();

此时,创建了两个新的辅助线程,它们独立于当前正在执行 Main 方法的主线程开始异步执行过程。事实上,Main 接下来要做的事情是通过调用 Sleep 方法将主线程挂起。该方法将当前正在执行的线程挂起指定的时间(毫秒)。在此时间间隔过后,Main 将重新激活,这时它将显示队列的内容。Main 重复此过程四次,如下所示:

for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

最后,Main 通过调用“退出线程”事件的 Set 方法通知辅助线程终止,然后对每个辅助线程调用 Join 方法以阻止主线程,直到每个辅助线程都响应该事件并终止。

有一个线程同步的最终示例:ShowQueueContents 方法。与制造者线程和使用者线程类似,此方法使用 lock 获得对队列的独占访问权限。然而在这种情况下,独占访问尤其重要,因为 ShowQueueContents 对整个集合进行枚举。对集合进行枚举是一个很容易由于异步操作造成数据损坏的操作,因为它涉及遍历整个集合的内容。ShowQueueContents 方法如下所示:

syncEvents.ExitThreadEvent.Set();

producerThread.Join();
consumerThread.Join();

最后请注意,ShowQueueContents 是由主线程执行的,因为它被 Main 调用。这意味着当此方法获得对项队列的独占访问权限时,它实际上既阻止了制造者线程访问队列,也阻止了使用者线程访问队列。ShowQueueContents 锁定队列并枚举其内容:

private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

下面是完整的示例。

示例

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}

示例输出

Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

请参见

任务

监视器同步技术示例
等待同步技术示例

参考

线程同步(C# 编程指南)
lock 语句(C# 参考)
AutoResetEvent
ManualResetEvent
Set
Join
WaitOne
WaitAll
Queue
ICollection
Start
Sleep
WaitHandle
EventWaitHandle

概念

C# 编程指南

其他资源

Thread 类