Windows и C++

Передача работы пулу потоков в Windows

Кенни Керр

Kenny KerrРазные люди вкладывают в параллельную обработку разный смысл. Некоторые мыслят в терминах агентов и сообщений (совместно работающих, но асинхронных конечных автоматов), другие — в терминах задач, обычно в форме функций или выражений, которые можно выполнять параллельно, а третьи — в терминах параллелизма данных, где структура данных позволяет выполнять параллельную обработку. Это можно было бы даже считать дополняющими или перекрывающимися методиками. Независимо от того, как вы смотрите на мир параллельной обработки, в основе любого современного подхода к такой обработке лежит пул потоков того или иного вида.

Создание потоков обходится относительно дорого. Избыточное количество потоков вызывает издержки планирования, влияющие на локальность кеша и общую производительность. В большинстве тщательно спроектированных систем единица параллельной обработки является сравнительно короткоживущей. В идеале, требуется простой способ создания потоков по мере необходимости, их повторного использования для дополнительной работы и 'интеллектуального' ухода от порождения слишком большого числа потоков, чтобы эффективно использовать доступные ресурсы. К счастью, такой идеал сегодня есть — и не в каких-то сторонних библиотеках, а прямо в сердцевине Windows API и называется API пула потоков Windows. Он не только отвечает этим требованиям, но и бесшовно интегрируется со многими базовыми строительными блоками Windows API. Он также берет на себя большую часть сложности написания масштабируемых и быстро отвечающих приложений. Если вы давний разработчик для Windows, то, несомненно, знакомы с таким краеугольным камнем масштабируемости в Windows, как портом завершения ввода-вывода (I/O completion port). Так вот, этот порт является основой пула потоков Windows.

Учитывайте, что пул потоков нельзя рассматривать просто как способ обойтись без вызова CreateThread со всеми ее параметрами и последующего обязательного вызова CloseHandle. Конечно, это может быть удобно, но может и вводить в заблуждение. Большинство разработчиков ожидает управляемую приоритетами модель планирования с вытеснением, реализуемую Windows. Потоки с одинаковыми приоритетом обычно делят процессорное время. Когда выделенный потоку квант (доля процессорного времени для выполнения потока) заканчивается, Windows определяет, готов ли к выполнению другой поток с таким же приоритетом. Естественно, на планирование потоков влияют многие факторы, но при наличии двух потоков, созданных почти в одно и то же время с одинаковым приоритетом и выполняющих некую операцию, интенсивно использующую процессорное время, можно ожидать, что оба начнут выполняться с разницей в несколько квантов.

В пуле потоков дело обстоит иначе. Пул потоков — и любая абстракция планирования на основе порта завершения ввода-вывода — опирается на модель очередей работы. Пул потоков гарантирует полное использование ядер процессора, в то же время предотвращая планирование чрезмерного количества потоков. Если две единицы работы одновременно передаются пулу на машине с одноядерным процессором, то выполняться будет лишь первая из них. Вторая будет обрабатываться, только если выполнение первой завершается или блокируется. Такая модель оптимальна для общей пропускной способности, поскольку работа будет выполняться эффективнее с меньшим количеством прерываний, но это же означает, что вам не дают никаких гарантий по задержкам.

API пула потоков спроектирован как набор кооперативно работающих объектов. В нем есть объекты, представляющие единицы работы, таймеры, асинхронный ввод-вывод и т. д. Есть даже объекты, представляющие такие сложные в реализации концепции, как отмена и очистка. К счастью, API не заставляет разработчиков иметь дело со всеми этими объектами, и вы можете использовать его как на высоком уровне, так и на более низком. Естественно, эта свобода создает риск неэффективного или неподходящего применения API. Вот почему я посвящу этому API следующие несколько статей. Когда вы начнете понимать разные роли, исполняемые разными частями этого API, вы обнаружите, что код, который вам нужно писать, становится не сложнее, а проще.

В данной статье я покажу, как приступить к передаче работы в пул потоков. Функции предоставляются пулу потоков как объекты работы (work objects). Такой объект состоит из указателя на функцию, а также void-указателя, называемого контекстом и передаваемого функции каждый раз, когда она выполняется пулом потоков. Объект работы можно передавать для выполнения несколько раз, но функцию и контекст нельзя изменять, не создав новый объект работы.

Объект работы создается функцией CreateThreadpoolWork. Если ее вызов заканчивается успешно, она возвращает непрозрачный указатель, представляющий объект работы. При неудачном вызове возвращается нулевой указатель, а дополнительную информацию можно получить через функцию GetLastError. При наличии объекта работы функция CloseThreadpoolWork информирует пул потоков о том, что данный объект может быть освобожден. Эта функция ничего не возвращает и для большей эффективности предполагает, что объект работы существует и допустим. К счастью, в прошлый раз я познакомил вас с шаблоном класса unique_handle, который как раз и берет на себя обработку таких ситуаций. Вот класс traits, который можно использовать вместе с unique_handle, а также как typedef для удобства:

struct work_traits
{
  static PTP_WORK invalid() throw()
  {
    return nullptr;
  }

  static void close(PTP_WORK value) throw()
  {
    CloseThreadpoolWork(value);
  }
};
typedef unique_handle<PTP_WORK, work_traits> work;

Теперь я могу создать объект работы и передать заботы о его сроке существования компилятору независимо от того, где находится этот объект — в стеке или контейнере. Конечно, до этого мне потребуется функция обратного вызова. Она объявляется таким образом:

void CALLBACK hard_work(PTP_CALLBACK_INSTANCE, void * context, PTP_WORK);

Макрос CALLBACK гарантирует, что функция реализует соответствующее соглашение о вызовах, ожидаемое Windows API для функций обратного вызова (и зависимое от целевой платформы). Создание объекта работы для этого обратного вызова с использованием typedef достаточно прямолинейно и продолжает шаблон, освещенный мной в прошлый раз:

void * context = ...
work w(CreateThreadpoolWork(hard_work, context, nullptr));
check_bool(w);

К этому моменту все, что у меня есть, — объект, представляющий некую работу, которую надо выполнить, но сам пул потоков пока не задействован, поскольку обратный вызов еще не передан для выполнения. Обратный вызов передается пулу потоков функцией SubmitThreadpoolWork. Ее можно вызывать неоднократно с тем же объектом работы, чтобы несколько обратных вызовов выполнялись параллельно. Эта функция используется так:

SubmitThreadpoolWork(w.get());

Конечно, даже передача работы не гарантирует ее немедленное выполнение. Обратный вызов ставится в очередь, но пул может ограничивать уровень параллелизма (количество рабочих потоков) для повышения эффективности. Так как все это весьма непредсказуемо, нужен некий способ ожидания незаконченных обратных вызовов — как выполняемых в данный момент, так и находящихся в очереди. В идеале, также должна быть возможность отмены тех обратных вызовов, которые еще не выполняются. Обычно любая разновидность блокирующей операции ожидания — плохая новость для параллельной обработки, но она все равно необходима для предсказуемой отмены и корректного завершения. Это тема для следующей статьи, поэтому сейчас я не стану больше тратить на нее время. Однако на данный момент замечу, что этим требованиям удовлетворяет функция WaitForThreadpoolWorkCallbacks. Вот пример:

bool cancel = ...
WaitForThreadpoolWorkCallbacks(w.get(), cancel);

Значение второго параметра определяет, будут ли отменяться незавершенные обратные вызовы или функция будет ждать их завершения даже в том случае, если их выполнение еще не начиналось. Теперь у меня достаточно информации для формирования базового функционального пула, использования API пула потоков и вкрапления C++ 2011 для создания более удобной в использовании оболочки. Более того, это хороший пример того, как применяются все функции, о которых было упомянуто к этому моменту.

Простой функциональный пул должен давать возможность передавать функцию для выполнения асинхронно. У меня должна быть возможность определять эту функцию, используя лямбда-выражение, именованную функцию или объект-функцию в зависимости от потребностей. Один из подходов — применение параллельного набора для хранения очереди функций с передачей этой очереди обратному вызову. Visual C++ 2010 включает шаблон класса concurrent_queue, который подходит для таких целей. Я исхожу из того, что вы используете обновленную реализацию из Service Pack 1, так как в исходной реализации есть ошибка, которая приводит к ошибке доступа к памяти (access violation), если очередь не пуста на момент деструкции.

Я могу начать определение класса функционального потока следующим образом:

class functional_pool
{
  typedef concurrent_queue<function<void()>> queue;

  queue m_queue;
  work m_work;

  static void CALLBACK callback(PTP_CALLBACK_INSTANCE,
    void * context, PTP_WORK)
  {
    auto q = static_cast<queue *>(context);

    function<void()> function;
    q->try_pop(function);

    function();
  }

Как видите, класс functional_pool управляет очередью объектов-функций, а также единственным объектом работы. Обратный вызов предполагает, что контекстом является указатель на очередь и что в очереди присутствует минимум одна функция. Теперь можно создать объект работы для этого обратного вызова и соответствующим образом задать контекст:

public:

  functional_pool() :
    m_work(CreateThreadpoolWork(callback, &m_queue, nullptr))
  {
    check_bool(m_work);
  }

Шаблон функции необходим для адаптации к различным типам функций, которые могут быть переданы. Его работа заключается в том, чтобы поставить функцию в очередь и вызвать SubmitThreadpoolWork, которая сообщает пулу потоков передать обратный вызов для выполнения:

template <typename Function>
void submit(Function const & function)
{
  m_queue.push(function);
  SubmitThreadpoolWork(m_work.get());
}

Наконец, деструктор functional_pool перед уничтожением очереди должен гарантировать, что никакие последующие обратные вызовы не будут выполняться, а иначе могут произойти самый ужасные вещи. Вот пример:

~functional_pool()
{
  WaitForThreadpoolWorkCallbacks(m_work.get(), true);
}

Теперь можно создать объект functional_pool и довольно легко передать работу, используя лямбда-выражение:

functional_pool pool;

pool.submit([]
{
  // Делаем это асинхронно
});

Очевидно, что при явной постановке функций в очередь и неявной постановке обратных вызовов в очередь будет некоторое падение производительности. Применение этого подхода в серверных приложениях, где параллельная обработка, как правило, весьма структурирована, было бы не слишком хорошей идеей. Если у вас лишь небольшое количество уникальных обратных вызовов, обрабатывающих большие объемы асинхронных рабочих нагрузок, то, вероятно, лучше использовать просто указатели на функции. Однако этот подход может оказаться полезным в клиентских приложениях. При наличии множества разнообразных кратковременных операций, которые вы хотели бы обрабатывать параллельно для повышения 'отзывчивости', удобство использования лямбда-выражений, как правило, заметно увеличивается.

Так или иначе, но эта статья не о лямбда-выражениях, а о передаче работы пулу потоков. Кажущийся более простым подход для достижения того же результата реализуется функцией TrySubmitThreadpoolCallback, как показано ниже:

void * context = ...
check_bool(TrySubmitThreadpoolCallback(
  simple_work, context, nullptr));

Это почти то же самое, что и объединение функций CreateThreadpoolWork и SubmitThreadpoolWork в одно целое, — в основном именно это и происходит. Функция TrySubmitThreadpoolCallback заставляет пул создать объект работы на внутреннем уровне, обратный вызов которой немедленно передается для выполнения. Так как объект работы принадлежит пулу потоков, вам не нужно заботиться о его освобождении. И не только не нужно, но и невозможно, потому что такой объект работы никогда не предоставляется через API. Это подтверждает и сигнатура обратного вызова:

void CALLBACK simple_work(
  PTP_CALLBACK_INSTANCE, void * context);

Обратный вызов выглядит во многом так же, как и раньше, но третьего параметра нет. ПОначалу это кажется идеальным: более простой API и меньше забот. Однако в этом случае нет очевидного способа ожидания завершения обратного вызова, не говоря уже о его отмене. Попытка написать класс functional_pool в виде TrySubmitThreadpoolCallback была бы проблематичной и потребовала бы дополнительной синхронизации. В следующей статье я расскажу, как сделать это, используя API пула потоков. Даже если бы вы смогли решить все эти проблемы, все равно осталась бы менее очевидная проблема, которая потенциально может досаждать на практике куда больше. Каждый вызов TrySubmitThreadpoolCallback включает создание нового объекта работы с выделением соответствующих ресурсов. При интенсивных рабочих нагрузках это может быстро привести к тому, что пул потоков станет использовать огромные объемы памяти, а производительность упадет еще больше.

Явное использование объекта работы дает и другие преимущества. Последний параметр в исходной форме обратного вызова предоставляет указатель на тот же объект работы, который передал выполняемый экземпляр. Вы можете использовать его для добавления в очередь дополнительных экземпляров того же обратного вызова. С его помощью можно даже освобождать объект работы. Однако такие трюки могут вызвать проблемы, так как становится заметно сложнее понять, когда безопасна передача работы и освобождение ресурсов приложения. В следующей статье мы изучим среду пула потоков, продолжив исследование API пула потоков в Windows.


Кенни Керр — высококвалифицированный специалист в области разработки ПО для Windows. С ним можно связаться через kennykerr.ca..

Выражаю благодарность за рецензирование статьи экспертам Хари Пулапака and и Педро Тейксейра