2015 年 10 月

第 30 卷,第 10 期

使用 C++ 进行 Windows 开发 - Visual C++ 2015 中的协同例程

作者 Kenny Kerr | 2015 年 10 月

我于 2012 年初次了解 C++ 中的协同例程,并在 MSDN 杂志这里写下了一系列文章来表达观点。我研究了一种轻量形式的协作多任务,借助 switch 语句通过巧妙的方式模拟协同例程。然后,我探讨了通过对 Promises 和 Futures 的建议扩展来提高异步系统的效率以及可组合性的一些措施。最后,我总结了一些即使在未来的 Futures 愿景中也依然存在的挑战,以及对所谓可恢复函数的建议。对于 C++ 中优雅简洁的并发,如果你对一些相关的挑战和历史感兴趣,那么我建议你阅读以下文章:

文章大部分都属于理论上的内容,因为我当时没有编译器来实现其中任何一个想法,而不得不以各种方式模拟它们。而今年初推出了 Visual Studio 2015。此版本的 Visual C++ 包含了称为 /await 的实验编译器选项,用于解锁由编译器直接支持的协同例程的实现。不再需要 Hack、宏或其他巧妙的技术。尽管是用于实验,且尚未经过 C++ 委员会批准,但它实实在在可用。并且它不仅仅是编译器前端的语法糖,例如你所了解的 C# yield 关键字和异步方法。C++ 实现在编译器后端包含了深度工程投资,可提供难以置信地可扩展实现。事实上,当编译器前端仅提供更方便的语法用于处理 Promises 和 Futures 或者甚至并发运行时任务类时,该实现所执行的操作远比你所了解的还要多。那么让我们重新访问这一主题,然后看看今天它已发展到什么地步。由于自 2012 年以来发生了很多变化,因此我先进行一个简单的回顾,说明我们当时所面临的情况以及现在的情形,然后再查看一些更具体的示例和实际应用。

由于借助了可恢复函数的显著示例总结出上述一系列内容,因此我从这里开始说起。想象一对要从某个文件读取然后写入网络连接的资源:

struct File
{
  unsigned Read(void * buffer, unsigned size);
};
struct Network
{
  void Write(void const * buffer, unsigned size);
};

你可以随意想象其他示例,但对于传统的同步 I/O,该示例颇具代表性。文件的 Read 方法会尝试将数据从当前文件位置读取到缓冲区,直到缓冲区的最大大小,然后将返回实际复制的字节数。如果返回值小于请求大小,这通常意味着已经到达文件末端。Network 类模型是典型的面向连接的协议,例如 TCP 或 Windows 命名管道。Write 方法将特定字节数复制到网络堆栈。典型的同步复制操作很容易想象,但我会借助图 1 帮助你形成一个参考框架。

图 1 同步复制操作

File file = // Open file
Network network = // Open connection
uint8_t buffer[4096];
while (unsigned const actual = file.Read(buffer, sizeof(buffer)))
{
  network.Write(buffer, actual);
}

只要 Read 方法返回一些大于零的值,便会使用 Write 方法将结果字节从中间缓冲区复制到网络。任何一个合理的程序员(无论其背景如何)都可以理解这种类型的代码。当然,Windows 提供的服务可以将此类型的操作完全卸载到内核以避免所有转换,但这些服务仅限于特定方案,而这是应用通常与其捆绑的阻止操作类型的代表。

C++ 标准库提供的 Futures 和 Promises 尝试支持异步操作,但由于其不成熟的设计而备受诟病。我在 2012 年探讨过这些问题。即使忽视这些问题,重写图 1 中的文件到网络复制示例也非常不容易。同步的最直接(也是最简单的)转换在循环时需要经手动谨慎编写的可以遍历 Futures 链的迭代算法。

template <typename F>
future<void> do_while(F body)
{
  shared_ptr<promise<void>> done = make_shared<promise<void>>();
  iteration(body, done);
  return done->get_future();
}

该算法在迭代函数中才能起实际作用:

template <typename F>
void iteration(F body, shared_ptr<promise<void>> const & done)
{
  body().then([=](future<bool> const & previous)
  {
    if (previous.get()) { iteration(body, done); }
    else { done->set_value(); }
  });
}

lambda 必须按值捕获共享 Promise,因此这实际是迭代而不是递归。但这样会出现问题,因为这对于每个迭代而言意味着一对互锁操作。此外,Futures 尚未具有链接延续的“then”方法,尽管你现在可以使用并发运行时任务类对其进行模拟。仍然假定这种未来算法和延续存在,我可以采用异步方式重写图 1 中的同步复制操作。我首先需要将异步重载添加到 File 和 Network 类。可能类似以下内容:

struct File
{
  unsigned Read(void * buffer, unsigned const size);
  future<unsigned> ReadAsync(void * buffer, unsigned const size);
};
struct Network
{
  void Write(void const * buffer, unsigned const size);
  future<unsigned> WriteAsync(void const * buffer, unsigned const size)
};

WriteAsync 方法的 Future 必须回响复制的字节数,因为这是任何延续可能所具有的全部数量以用于确定是否终止迭代。另一个选项可用于 File 类来提供 EndOfFile 方法。在任何考虑到这些新基元的情况下,如果你很清醒,你肯定可以采用可理解的方式来表达此复制操作。图 2 说明了此方法。

图 2 使用 Futures 的复制操作

File file = // Open file
Network network = // Open connection
uint8_t buffer[4096];
future<void> operation = do_while([&]
{
  return file.ReadAsync(buffer, sizeof(buffer))
    .then([&](task<unsigned> const & read)
    {
      return network.WriteAsync(buffer, read.get());
    })
    .then([&](task<unsigned> const & write)
    {
      return write.get() == sizeof(buffer);
    });
});
operation.get();

只要循环“主体”返回 True,do_while 算法就有助于进行延续链接。因此调用了 ReadAsync,其结果由 WriteAsync 使用,后者的结果作为循环条件进行检验。这不是复杂的事情,但我并不想编写这样的代码。它经过人为编写,很快就会变得非常复杂而难以进行推论。输入可恢复函数。

添加 /await 编译器选项使编译器能够支持可恢复函数,一种 C++ 协同例程的实现。它们称为可恢复函数而不是简单的协同例程,因为这意味着它们要尽可能表现地像传统 C++ 函数一样。实际上,与我在 2012 年所探讨的不同,某些函数的使用者根本不应该需要知道函数是否作为协同例程实现。

从本文撰写开始,/await 编译器选项还需要 /Zi 选项而非默认 /ZI 选项,以便禁用调试器的编辑和继续功能。你还必须使用 /sdl- 选项禁用 SDL 检查并避免使用 /RTC 选项,因为编译器的运行时检查与协同例程不兼容。所有这些限制都是暂时的并且由于实现的实验本质,我希望在未来的编译器更新中将其解除。但这些都是值得的,正如图 3 所示。与使用 Futures 实现的复制操作的所需内容相比,毫无疑问,这样编写明显更为简单且易于理解。实际上,这与图 1 中的原始同步示例非常相似。在本例中,WriteAsync Future 还无需返回特定值。

图 3 可恢复函数内的复制操作

future<void> Copy()
{
  File file = // Open file
  Network network = // Open connection
  uint8_t buffer[4096];
  while (unsigned copied = await file.ReadAsync(buffer, sizeof(buffer)))
  {
    await network.WriteAsync(buffer, copied);
  }
}

图 3 中使用的 await 关键字以及 /await 编译器选项提供的其他新关键字只能出现在可恢复函数内,因此便出现在返回 Future 的 Copy 函数内。我要使用上个 Futures 示例中相同的 ReadAsync 和 WriteAsync 方法,但重要的是要意识到编译器完全不了解 Futures。实际上,它们根本不需要是 Futures。那么,具体的工作方式如何呢? 是这样,只有编写了为编译器提供必要绑定的某些适配器函数后它才可正常工作。这类似于编译器通过查找合适的 Begin 和 End 函数找出如何连接到基于范围的 for 语句的方式。在 await 表达式的情况下,与查找 Begin 和 End 不同,编译器要查找的合适函数称为 await_ready、await_suspend 和 await_resume。与 Begin 和 End 相同,这些新函数可以是成员函数也可以是自由函数。编写非成员函数的能力非常有用,因为你可以随后为提供必要语义的现有类型编写适配器,像我到目前为止已探索的未来 Futures 一样。图 4 提供的一组适配器可以满足编译器对图 3 中可恢复函数的解释。

图 4 假设 Future 的 Await 适配器

namespace std
{
  template <typename T>
  bool await_ready(future<T> const & t)
  {
    return t.is_done();
  }
  template <typename T, typename F>
  void await_suspend(future<T> const & t, F resume)
  {
    t.then([=](future<T> const &)
    {
      resume();
    });
  }
  template <typename T>
  T await_resume(future<T> const & t)
  {
    return t.get();
  }
}

此外,请记住 C++ 标准库的 Future 类模板尚未提供“then”方法来添加延续,但此模板足以使本示例适用于如今的编译器。可恢复函数内的 await 关键字有效地设置了潜在的挂起点,当操作尚未完成时,执行可能在此处退出函数。如果 await_ready 返回 True,则不会挂起执行,并且立即调用 await_resume 来获取结果。反之,如果 await_ready 返回 False,则调用 await_suspend,以便允许操作注册由编译器提供的要在最终完成时调用的恢复函数。调用恢复函数时,协同例程会立即在上一个挂起点恢复,并且执行继续进行到下一个 await 表达式或函数的终止处。

请记住,恢复将在任何调用编译器恢复函数的线程上进行。这意味着可恢复函数完全有可能在某一个线程上开始运行,然后在另一个线程上恢复并继续执行。实际上从性能角度来看,这是一种理想状况,因为替代意味着将恢复分派到其他线程,而这样做通常开销较大且并不必要。另一方面,如果存在任何后续代码都具有线程关联,这将是非常理想甚至是必需的情况,正如大多数图形代码的情况。遗憾的是,await 关键字尚未提供一种方式,可以让 await 表达式的作者为编译器提供这样的提示。这并非没有先例。并发运行时确实具有这样的选项,但有趣的是,C++ 语音本身提供了一种你可能会遵循的模式:

int * p = new int(1);
// Or
int * p = new (nothrow) int(1);

同样地,await 表达式也需要一种机制才能向 await_suspend 函数提供一个提示来影响发生恢复的线程上下文:

await network.WriteAsync(buffer, copied);
// Or
await (same_thread) network.WriteAsync(buffer, copied);

默认情况下,恢复通过尽可能最高效的操作方式进行。某些假设 std::same_thread_t 类型的 same_thread 常量可能会在 await_suspend 函数的重载之间产生歧义。图 3 中的 await_suspend 将是最高效的默认选项,因为它很有可能在工作线程上恢复并在无需进一步切换上下文的情况下完成。当使用者需要线程关联时,会请求图 5 中所示的 same_thread 重载。

图 5 假设 await_suspend 重载

template <typename T, typename F>
void await_suspend(future<T> const & t, F resume, same_thread_t const &)
{
  ComPtr<IContextCallback> context;
  check(CoGetObjectContext(__uuidof(context),
    reinterpret_cast<void **>(set(context))));
  t.then([=](future<T> const &)
  {
    ComCallData data = {};
    data.pUserDefined = resume.to_address();
    check(context->ContextCallback([](ComCallData * data)
    {
      F::from_address(data->pUserDefined)();
      return S_OK;
    },
    &data,
    IID_ICallbackWithNoReentrancyToApplicationSTA,
    5,
    nullptr));
  });
}

此重载将检索 IContextCallback 接口以查找调用线程(或单元)。然后,延续最终会从这一相同的上下文中调用编译器的恢复函数。如果这恰好是应用的 STA,该应用便可以轻松地通过线程关联继续与其他服务交互。ComPtr 类模板和检查帮助程序函数属于 Modern 库,你可以从 github.com/kennykerr/modern 下载它,但你也可以使用可供你支配的任何资源。

我介绍了大量内容,虽然其中的某些部分依然较为理论化,但 Visual C++ 编译器已经提供所有繁重的工作而使之成为可能。这对于对并发感兴趣的 C++ 开发者来说是一个令人兴奋的时刻,我希望你们在下个月继续同我一起研究,我将深入探讨 Visual C++ 的可恢复函数。


Kenny Kerr是加拿大的计算机程序员,是 Pluralsight 的作者,也是一名 Microsoft MVP。他的博客网址是 kennykerr.ca,你可以通过 Twitter @kennykerr 关注他。

衷心感谢以下 Microsoft 技术专家对本文的审阅: Gor Nishanov