.Net 相关问题

轮询访问 ThreadPool

Stephen Toub

 

问:我目前正在使用 Microsoft .NET Framework ThreadPool,在使用过程中我遇到了一种情况,不知道该如何解决。我需要处理一个较大批次的已在队列中的工作项,在第一个批次开始处理后第二个批次(规模稍小)到达。最初,较大批次中的一些工作将被分配给 ThreadPool 中的所有工作线程。但是,当第二批到达后,我想平均分配,使每一批都得到同样的服务,而不是先到的批次得到所有线程。

 

当其中一批完成时,我希望无论哪一个仍需处理的批次,都能够获得所有工作线程的关注。我该如何在 ThreadPool 顶层对此类批处理功能进行分层呢?

答:在以前的专栏中,我曾介绍过如何在现有 .NET ThreadPool 的顶层对各种类型的功能进行分层。在 2004 年 10 月刊的《MSDN 杂志》中,我介绍了如何使 ThreadPool 支持等待排队的工作项(请参见“ThreadPoolWait 和 HandleLeakTracker”)。在 2004 年 11 月刊中,我介绍了如何添加对工作项优先级的支持(请参见“ThreadPoolPriority 和 MethodImplAttribute”)。在 2006 年 3 月刊中,我介绍了如何添加取消支持(请参见“可以取消的线程池”)。将来,我可以回顾 2009 年 1 月这一期,并提到我曾介绍过如何在 ThreadPool 中添加轮询调度支持。

您需要解决的问题首先是要了解 ThreadPool 分派工作的原理。在内部,它维护一个已被加入其队列的工作项队列。当池中的某个线程可以执行工作时,它会返回到此工作队列并提取下一个项目。此处理顺序并未记录在案,因此是非常不可靠的(因为它可能而且很可能会在未来版本中有所变更)。

现在它可以通过一种非常简单的方式来实现:先进先出 (FIFO) 队列。因此,要排队的第一个工作将成为被线程所选中的第一个工作。在您的批处理情况中,这意味着第一批中的所有工作在队列中都将排在第二批所有工作的前面。因此,第一批的所有工作都将在第二批的所有工作之前进行分派。对于某些情况这是合理且最佳的方法。而对于您的情况,还需要进行更多的控制。

从 ThreadPool 获得这种控制的最简单方法之一是将您自己的委托替换为用户真正需要执行的委托。例如,假设您想捕获已排队工作中抛出的所有未处理异常并针对每个异常引发一个事件。要执行此操作,您可以编写如图 1 所示的代码。然后,使用 ExceptionThreadPool.QueueUserWorkItem 而不是使用 ThreadPool.QueueUserWorkItem。此工作仍将由 ThreadPool 来执行,但池中的线程实际执行的是您排队的委托,而非用户提供的委托。接下来在调用您的委托时将会调用用户提供的委托,而且会捕获所有异常并引发目标事件。

图 1 填充 ThreadPool

public static class ExceptionThreadPool {
  public static void QueueUserWorkItem(
      WaitCallback callback, object state) {
    ThreadPool.QueueUserWorkItem(delegate {
      try { callback(state); }
      catch (Exception exc) {
        var handler = UnhandledException;
        if (handler != null) 
          handler(null, 
            new UnhandledExceptionEventArgs(exc, false));
      }
    });
  }

  public static event 
    UnhandledExceptionEventHandler UnhandledException;
}

请注意,此方法虽然功能很强大,但却需要付出一定的代价:需要分配额外的委托、需要调用额外的委托等等。成本是否过高只能根据您和您所处的环境来确定,但与从头编写自己的线程池相比,这种分层通常更具成本效益。

当然,这是一个非常简单的示例,但您也可以用来处理很复杂的工作。我之前引用的优先级池示例将用户提供的委托存储在它自己的数据结构中。然后它将返回的替代委托在池中排队并在这些数据结构中搜索要执行的正确委托,而且更倾向于首先执行优先级较高的委托。您可以使用类似的方法来解决您的批处理困境。

您可以花些时间想象一下,如果不是只有单个队列,而是每个批次都有一个队列的情形。各个批次会将工作置入其自身的相关队列中。然后,您可以使用池中的线程在所有队列之间进行轮询。排列到实际 ThreadPool 队列中的委托将返回到您的数据结构中,它会从下一个要检查的队列开始查找工作。如果发现,则从该队列开始执行。否则,它将继续查找下一个队列。

通过这种方式,您可以在各个批次之间提供公平的调度。如果只有一批,则线程将始终从该队列提取工作。如果有多批,则线程将访问每个队列,并给予大致相同的待遇。图 2 为您提供了此解决方案的概观。

netmatters,fig02.gif

图 2 RoundRobinThreadPool 方法

为使工作进行下去,您首先需要一个数据结构来存储用户提供的委托。我所采用的表现形式如图 3 所示。此数据结构包含三个属性。前两个属性您应该很熟悉;它们是用户为 QueueUserWorkItem 提供的状态,所以您理所当然需要对其进行缓存。不过,第三个属性可能会有些陌生。在 .NET 中与每个执行线程都相关联的是 System.Threading.ExecutionContext,它代表一些信息,例如当前用户、与逻辑执行线程相关联的任何状态、代码访问安全信息等。这种跨异步执行点的上下文流动非常重要。

图 3 捕获工作项

internal class WorkItem {
  public WaitCallback WaitCallback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = state => {
    var item = (WorkItem)state;
    item.WaitCallback(item.State);
  };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else WaitCallback(State);
  }
}

例如,如果您要模拟特定的 Windows 身份标识并调用 ThreadPool.QueueUserWorkItem,则您排队的工作应使用相同的 Windows 身份标识来执行。否则将会出现潜在的安全漏洞。从 .NET Framework 2.0 开始,此上下文会默认自动跨代码中的所有异步点流动:ThreadPool.QueueUserWorkItem、新建线程、异步委托调用等。

但是对于此处讨论的实现,您应遵守一组不同的规则。通过从委托的执行顺序更改其排队顺序,此 ExecutionContext 流与用户提供的工作项之间不再存在直接的对应关系。因此,您的实现需要正确缓存 ExecutionContext 与用户提供的委托,然后使用捕获的上下文来运行该委托。

现在,您已经有了一个工作项,让我们来看一看它将被放入的队列(如图 4 所示)。RoundRobinThreadPool.Queue 数据结构本身相当简单。在内部,它包含一个用来存储为其提供的所有工作项的 Queue<WorkItem>、一个对所关联的 RoundRobinThreadPool 实例的引用,以及一个表示 Dispose 方法是否在队列中被调用过的布尔值。然后,它会为 QueueUserWorkItem 方法提供与 ThreadPool 相同的签名。

图 4 轮询队列

public sealed class RoundRobinThreadPool {  
  private List<Queue> _queues;
  ...

  public sealed class Queue : IDisposable {
    internal Queue(RoundRobinThreadPool pool) { _pool = pool; }

    internal Queue<WorkItem> _workItems = new Queue<WorkItem>();
    private RoundRobinThreadPool _pool;
    internal bool _disposed;

    public void QueueUserWorkItem(WaitCallback callback) { 
      QueueUserWorkItem(callback, null); 
    }

    public void QueueUserWorkItem(WaitCallback callback, object state) {
      if (_disposed) 
        throw new ObjectDisposedException(GetType().Name);
      var item = new WorkItem { 
        Context = ExecutionContext.Capture(), 
        WaitCallback = callback, State = state };
      lock (_pool._queues) _workItems.Enqueue(item);
      _pool.NotifyNewWorkItem();
    }

    public void Dispose() {
      if (!_disposed)  {
        lock (_pool._queues) {
          if (_workItems.Count == 0) 
            _pool.RemoveQueueNeedsLock(this);
        }
        _disposed = true;
      }
    }
  }
}

调用 QueueUserWorkItem 时,用户提供的回调和状态(以及当前的 ExecutionContext)将被捕获到 WorkItem 中。此工作随后将被存储到泛型队列中,并且相关池会得到新工作已到达的通知。您务必要注意到这里使用了一个锁来保护工作项队列,因为 QueueUserWorkItem 可以同时从多个线程进行调用,而您需要确保维持不变性。

还要注意,被锁定的对象是一个来自池的全局队列列表。对于整个实现,我使用的是一个粗粒度锁。更有效的实现可能会使用更细粒度的锁定,例如针对每个队列使用单个锁,而不是整个 RoundRobinThreadPool 使用一个。为了便于实现和出于方便考虑,我选择使用单一锁。

当不再需要此队列时会使用 Dispose 方法。在典型的批处理方案中,将会创建一个队列、将工作排列到该队列中,然后释放队列。如果 Dispose 方法只是将此队列从池中删除,那么即使在其中仍有要处理的工作项,该队列仍可能会被删除。

这时 Dispose 会做两件事情。首先,它会检查是否存在任何剩余的工作项。如果不存在,队列会调用池以将自身删除。其次,它将自身标记为已释放。您稍后会看到当池遇到已释放但尚未删除的池时,它是如何处理这一情况的。

图 5 显示实现的其余部分,即 RoundRobinThreadPool 类本身。该池包含四个字段:

  • 池(同时还作为先前提及的锁)所维护的各个队列的列表。
  • 池的默认队列。
  • 代表要在其中搜索工作的下一个队列的整数。
  • 已实际排列到 ThreadPool 中的回调委托。

图 5 RoundRobinThreadPool

public sealed class RoundRobinThreadPool {
  private List<Queue> _queues;
  private Queue _defaultQueue;
  private int _nextQueue;
  private WaitCallback _callback;

  public RoundRobinThreadPool() {
    _queues = new List<Queue>();
    _callback = DequeueAndExecuteWorkItem;
    _nextQueue = 0;
    _defaultQueue = CreateQueue();
  }

  public Queue CreateQueue() {
    var createdQueue = new Queue(this);
    lock (_queues) _queues.Add(createdQueue);
    return createdQueue;
  }

  public void QueueUserWorkItem(WaitCallback callback)  { 
    QueueUserWorkItem(callback, null); 
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _defaultQueue.QueueUserWorkItem(callback, state);
  }

  private void RemoveQueueNeedsLock(Queue queue) {
    int index = _queues.IndexOf(queue);
    if (_nextQueue >= index) _nextQueue--;
    _queues.RemoveAt(index);
  }

  private void NotifyNewWorkItem() { 
    ThreadPool.UnsafeQueueUserWorkItem(_callback, null); 
  }

  private void DequeueAndExecuteWorkItem(object ignored) {
    WorkItem item = null;

    lock (_queues) {
      var searchOrder = 
        Enumerable.Range(_nextQueue, _queues.Count - _nextQueue).
        Concat(Enumerable.Range(0, _nextQueue));

      foreach (int i in searchOrder) {
        var items = _queues[i]._workItems;
        if (items.Count > 0) {
          item = items.Dequeue();
          _nextQueue = i; 
          if (queue._disposed && 
              items.Count == 0) RemoveQueueNeedsLock(_queues[i]);
          break;
        }
      }
      _nextQueue = (_nextQueue + 1) % _queues.Count;
    }
    if (item != null) item.Execute();
  }

  ... // RoundRobinThreadPool.Queue and .WorkItem, already shown
}

初始化 RoundRobinThreadPool 时,将会配置所有这些状态。特别是,会通过调用 CreateQueue 方法来初始化默认队列。此 CreateQueue 方法与公开的、允许开发人员向池中添加其他队列(例如,当需要自己独立队列的新工作批次到达时)的方法完全相同。它只实例化新 RoundRobinThreadPool.Queue 实例(图 3 中探讨的类型)、将其添加到队列列表中,然后再返回它。

为了方便使用,RoundRobinThreadPool 公开了其自身的 QueueUserWorkItem 方法;这些方法只针对在池实例化时创建的默认队列。

下面介绍 NotifyNewWorkItem 方法。您可能会记得,如果针对某个队列调用了 QueueUserWorkItem,则在存储工作项之后,该队列会在池中调用此 NotifyNewWorkItem 方法。此方法只是委托给实际的 ThreadPool,它会提交一个将会返回给 DequeueAndExecuteWorkItem 方法(稍后介绍)的委托,顾名思义,此方法将会取消排队并执行轮询池中的工作项。

请注意,NotifyNewWorkItem 调用的是 ThreadPool.UnsafeQueueUserWorkItem,而不是 ThreadPool.QueueUserWorkItem。"Unsafe" 前缀只是意味着它不是流动 ExecutionContext;不这样做可得到一些性能优势。由于此实现已开始对 ExecutionContext 流进行手动处理,因此 ThreadPool 也不需要尝试执行此操作。

DequeueAndExecuteWorkItem 是真正的奇妙所在。此方法首先生成队列搜索顺序。搜索顺序从要检查的下一个队列开始一直到列表结束,然后再循环回来,从列表的起始处到队列起始处。为了简化实现,故使用 LINQ Enumerable.Range 方法生成了两个列表,然后再使用 LINQ Enumerable.Concat 方法将其连接在一起。

确定了搜索顺序后,即可开始查找工作项。每个队列都会按指定的顺序进行检查,找到工作项后,即会将其删除并更新下一个指针。然后会使用图 3 中所示的 Execute 方法来调用此工作项。

此处有一行特别有趣的代码,它用来检查刚刚从中检索到某个项目的队列是否已释放且为空。如果池发现了此类队列,就会意识到不会再有任何项目添加到其中(因为它已被释放),因此不需要再保留这个池。此时,将会使用 RemoveQueueNeedsLock 把目标队列从队列列表中删除,并可能会更新下一个队列指针以防止其超出范围。

请注意,此方法没有在内部使用锁,而是访问共享状态;因此我用 "NeedsLock" 后缀命名此方法,以提醒我自己在持有锁时需要调用此方法。您会注意到,针对 RemoveQueueNeedLock 的两个调用站点(一个在队列的 Dispose 方法中,另一个在池的 DequeueAndExecuteWorkItem 方法中)在持有队列锁的时候都会调用此方法。

随着此实现的完成,您现在可以在代码中对此进行测试。在以下示例中,我创建了 RoundRobinThreadPool 的一个静态实例。如果有一批工作到达并需要进行处理,这时将会创建一个新的队列、将所有工作都排列到此队列中,然后释放队列:

 

private static RoundRobinThreadPool _pool = 
  new RoundRobinThreadPool();
...
private void ProcessBatch(Batch b) {
  using(var queue = _pool.CreateQueue()) {
    foreach(var item in b) {
      queue.QueueUserWorkItem(() => ProcessItem(item));
    }
  }
}

即使会首先安排第一批到达的所有工作,但只要第二批到达,它也将会得到基本相同的处理资源。

可以在此实现中添加更多的点缀性功能,从性能角度来看,实现很可能会大为改观。但是,几乎不需要任何代码您即可创建一个抽象,从而利用 .NET ThreadPool 及其提供的所有能力和功能,而您仍可以获得您所寻求的基于批次的公平性。

请将您想向 Stephen 询问的问题和提出的意见发送至 netqa@microsoft.com

Stephen Toub 是 Microsoft 并行计算平台团队的一名高级项目经理。他还是**《MSDN 杂志》的特约编辑。