使用 C++ 进行 Windows 开发

Windows 线程池和工作

Kenny Kerr

Kenny Kerr
并发对于不同的人有不同的含义。有些人会想到代理和消息 — 协作但异步的状态机。另一些人会想到任务,它们通常以并发执行的函数或表达式的形式存在。还有些人会想到数据并行性,即由数据结构支持的并行。您甚至可以把它们看作具有互补性或重叠性的技术。无论您如何看待并发性问题,当代任何一种并发性方式的核心都是某种形式的线程池。

线程的创建相当昂贵。线程过多会带来调度开销,进而影响缓存局部性和整体性能。在大多数设计合理的系统中,并发性单元的寿命相对较短。理想情况是有一种能够按需创建线程的简单方法,可随着工作的增加重复使用线程,并以某种智能化方式避免创建过多线程,从而高效利用有限的计算能力。幸运的是,这种美好的想法现在的确存在,它不在某个第三方库中,而就在 Windows API 的内部。Windows 线程池 API 不仅满足这些要求,而且还能够无缝集成 Windows API 的许多核心构建基块。它在很大程度上降低了可伸缩和高响应性应用程序编写的复杂性。如果您是一名从事 Windows 开发时间很久的开发人员,您肯定对 Windows 可伸缩性的基础即 I/O 完成端口非常熟悉。对 I/O 完成端口在 Windows 线程池中处于核心地位这一事实,我们应该感到欣慰。

请记住,线程池不应只被看作一种避免使用全部 CreateThread 参数来调用 CreateThread 以及使用所得句柄来调用 CloseHandle 的一种手段。这种看法确实比较方便,但也可能让人产生误解。大多数开发人员对于 Windows 实现的优先级驱动型提前调度模型抱有厚望。优先级相同的线程通常共享处理器时间。当一个线程的量程(线程开始运行所需的时间)结束时,Windows 会决定另一个相同优先级的线程是否准备好执行。当然,许多因素都会影响线程调度。但是,如果两个线程创建的时间大致相同、优先级相同、都用来执行计算密集型操作,则这两个线程开始执行的时间相互应该只差几个量程。

而线程池则不然。线程池(及任何基于 I/O 完成端口的调度抽象)依赖于工作排队模型。线程池不仅能够保证内核的充分利用,还能防止过分调度。如果在单核计算机上大致同时提交两个工作单元,则只有第一个被调度,第二个只有在第一个完成或阻塞时才会开始。由于工作中断少而执行效率更高,此模型可以获得最佳吞吐量,但也意味着没有延迟保证。

线程池 API 被设计为一组协同对象,其中有些对象表示工作单位、计时器、异步 I/O 等等。甚至有些对象表示高难度概念,如取消和清理。幸运的是,该 API 没有强迫开发人员处理所有这些对象,而是更像自助餐,使用多少全由您自己决定。当然,这种自由也会带来 API 使用效率不高或使用方式不正确的风险。正因为如此,我会在接下来的几个月里在本专栏中对此进行论述。当您逐渐掌握 API 各个部分扮演的不同角色后,您将发现自己要编写的代码变得更加简单而不是更加复杂。

在这第一个专栏内,我准备介绍如何开始向线程池提交工作。函数作为工作对象向线程池公开。工作对象由一个函数指针和一个被称为上下文的 void 指针组成,线程池每次在执行时将 void 指针送入函数。工作对象可多次提交执行,但如果不创建新的工作对象,则不能更改函数和上下文。

CreateThreadpoolWork 函数负责创建工作对象。如果该函数成功,会返回一个不透明指针来表示工作对象。如果失败,会返回一个 null 指针值,并通过 GetLastError 函数提供更多信息。给定工作对象后,CloseThreadpoolWork 函数通知线程池该对象可释放。此函数不返回值,并且出于效率原因假定工作对象有效。幸运的是,我在上个月专栏中介绍的 unique_handle 类模板可以对此进行处理。下面是 unique_handle 可以使用的一个特征类以及为方便起见使用的一个 typedef:

struct work_traits
{
  static PTP_WORK invalid() throw()
  {
    return nullptr;
  }

  static void close(PTP_WORK value) throw()
  {
    CloseThreadpoolWork(value);
  }
};

typedef unique_handle<PTP_WORK, work_traits> work;

现在我可以创建一个工作对象,并让编译器处理其生存期,无论该对象驻留在堆栈上还是容器内。 当然,在我这样做之前,我需要一个函数供它调用,此过程称为回调。 回调的声明如下:

void CALLBACK hard_work(PTP_CALLBACK_INSTANCE, void * context, PTP_WORK);

CALLBACK 宏可确保函数按 Windows API 对回调的要求并根据不同的目标平台实现相应的调用约定。 使用工作 typedef 可以非常简单地为该回调创建工作对象,并继续使用我在上月专栏中重点介绍的模式,如下所示:

void * context = ...
work w(CreateThreadpoolWork(hard_work, context, nullptr));
check_bool(w);

此时,我手中只有一个表示某个待执行工作的对象,但是并不涉及线程池本身,因为工作回调尚未提交执行。 SubmitThreadpoolWork 函数负责将工作回调提交到线程池。 为了允许多个回调并行运行,该函数可以用同一个工作对象多次调用。 该函数如下所示:

SubmitThreadpoolWork(w.get());

当然,即使提交工作也不能保证它被立即执行。 工作回调被排入队列。但是,为了提高效率,线程池可能限制并发级别,即限制工作线程的数量。 这无法预测,因此需要找出一种等待未完成的回调完成的办法,包括当前正在执行和等待执行的回调。 最好还能够取消那些没有得到执行机会的工作回调。 通常,任何形式的阻塞“等待”操作对并发性来说都是坏消息。但是,为了能够执行可预测的取消和关闭,这种操作还是有必要的。 由于这是下一期栏目的主题,在此不再多说。 但是,目前 WaitForThreadpoolWorkCallbacks 函数可以满足上述要求。 例如:

bool cancel = ...
WaitForThreadpoolWorkCallbacks(w.get(), cancel);

第二个参数的值决定着等待执行的回调是取消还是函数等待它们执行完毕,即使它们还未开始执行。 现在,我已经做足准备来创建一个基本功能池,即利用线程池 API 和 C++ 2011 的喷淋方法来创建大家更喜欢使用的东西。 此外,它还为使用我到目前为止介绍的所有函数提供了一个很好的例子。

简单的功能池应允许我提交可异步执行的函数。 我应该能够使用 lambda 表达式、命名函数或函数对象(如果需要)定义该函数。 一种方法是用一个并发收集来存储函数队列,并将该队列送入工作回调。 Visual C++ 2010 中的 concurrent_queue 类模板可以实现此目的。 我假定您正在使用由 Service Pack 1 提供的更新的系统实现,因为原来的版本中存在一个 Bug,该 Bug 会导致销毁非空队列时产生访问冲突。

我可以继续开始定义如下功能池类:

class functional_pool
{
  typedef concurrent_queue<function<void()>> queue;

  queue m_queue;
  work m_work;

  static void CALLBACK callback(PTP_CALLBACK_INSTANCE, void * context, PTP_WORK)
  {
    auto q = static_cast<queue *>(context);

    function<void()> function;
    q->try_pop(function);

    function();
  }

可以看出,functional_pool 类管理一个函数对象队列和一个工作对象。 回调将假定上下文是队列的指针,并假定队列中至少存在一个函数。 现在我可以为该回调创建工作对象,并相应地设置上下文,如下所示:

public:

  functional_pool() :
    m_work(CreateThreadpoolWork(callback, &m_queue, nullptr))
  {
    check_bool(m_work);
  }

需要针对提交的各种函数类型准备一个适用的函数模板。 它的职责只是将函数送入队列,并通过调用 SubmitThreadpoolWork 指示线程池提交要执行的工作回调,如下所示:

template <typename Function>
void submit(Function const & function)
{
  m_queue.push(function);
  SubmitThreadpoolWork(m_work.get());
}

最后,functional_pool 析构函数需确保在允许销毁队列之前不再有要执行的回调,否则后果将非常严重。 例如:

~functional_pool()
{
  WaitForThreadpoolWorkCallbacks(m_work.get(), true);
}

现在,我可以创建 functional_pool 对象,并使用 lambda 表达式非常轻松地提交工作:

functional_pool pool;

pool.submit([]
{
  // Do this asynchronously
});

显然,显式函数排队和隐式工作回调排队会造成一定的性能损失。 尽管在服务器应用程序中使用该方法时,并发性通常很有组织性,但这可能不是一个好注意。 如果您只有较少量的唯一性回调,却要处理大量的异步工作负载,那么使用函数指针会更好。 但是,该方法在客户端应用程序中可能非常有用。 如果为了提高响应能力,需要并发处理许多不同的短期操作,则使用 lambda 表达式往往更加方便。

毕竟本文主题不是 lambda 表达式,而是如何向线程池提交工作。 TrySubmitThreadpoolCallback 函数似乎为此提供了一种更为简单的方法,如下所示:

void * context = ...
check_bool(TrySubmitThreadpoolCallback(
  simple_work, context, nullptr));

似乎 CreateThreadpoolWork 和 SubmitThreadpoolWork 函数已经合为一个,基本情况就是这样。 TrySubmitThreadpoolCallback 函数使线程池在内部创建一个工作对象,并将其回调立即提交执行。 由于线程池负责管理该工作对象,因而您不必担心的它的释放问题。 事实上,由于 API 绝不会公开该工作对象,您完全没有必要为此担心。 回调的签名还提供进一步的证据,如下所示:

void CALLBACK simple_work(
  PTP_CALLBACK_INSTANCE, void * context);

除了缺少第三个参数以外,该回调与以前相比几乎没什么变化。 初看起来,这种情况似乎非常理想:API 更加简单,让人担心的事情更少。 但是,并没有一种明显的方法可等待回调完成,更不用说取消回调了。 在尝试编写 functional_pool 类的 TrySubmitThreadpoolCallback 方面内容时会出现问题,而且需要更多同步。 下一期专栏将介绍如何使用线程池 API 解决此问题。 即使能够解决这些问题,仍然存在一个不甚明显的问题,而其潜在破坏力实际更大。 每次调用 TrySubmitThreadpoolCallback 都涉及利用它的相关资源创建新的工作对象。 在工作负载非常大时,这会使线程池迅速消耗大量内存,进而导致更大性能损耗。

显式使用工作对象还有其他好处。 回调的最后一个保持原形的参数提供一个指针,该指针指向提交运行实例的同一工作对象。 您可以用它对同一回调的更多实例进行排队。 甚至可以用它来释放工作对象。 但是,这些技巧可能给您带来麻烦,因为要知道何时提交工作更安全以及何时释放应用程序资源更安全变得越来越困难。 在下个月的专栏中,在继续探讨 Windows 线程池 API 的同时,我将介绍线程池的环境。

Kenny Kerr 是一位热衷于本机 Windows 开发的软件专家。 您可以通过 kennykerr.ca 与他联系。

衷心感谢以下技术专家对本文的审阅: Hari PulapakaPedro Teixeira