.NET : вопросы и ответы.

Циклический доступ к ThreadPool.

Стивен Тауб (Stephen Toub)

 

В. В настоящий момент я использую Microsoft .NET Framework ThreadPool и столкнулся с ситуацией, насчет способа решения которой не уверен. Я начинаю с большого пакета рабочих элементов, который устанавливается в очередь и после начала его обработки прибывает второй (меньший) пакет. Первоначально, часть работы в большом пакете передается всем рабочим процессам в ThreadPool. Однако, когда прибывает второй пакет, я хочу, чтобы распределение было равномерным, с одинаковым обслуживанием обеих пакетов, вместо уделения всего внимания первому потому, что он прибыл первым.

 

Когда один из пакетов завершает работу, я хочу, чтобы тот, который еще обрабатывается, получил бы внимание всех рабочих потоков. Можно ли что-то сделать, чтобы создать уровень подобных функций пакетирования поверх ThreadPool?

О. В прошлых статьях, я показал, как создавать уровни различных типов функций поверх существующего .NET ThreadPool. В выпуске журнала MSDN Magazine за октябрь 2004 года, я показал, как добавлять в ThreadPool поддержку ожидания установленных в очередь рабочих элементов (см. "ThreadPoolWait и HandleLeakTracker"). В выпуске за ноябрь 2004 года я показал, как добавить поддержку расстановки рабочих элементов по приоритетности " (см. ThreadPoolPriority и MethodImplAttribute"). И в выпуске за март 2006 года я показал, как добавить поддержку отмены (см. "Пул потоков с поддержкой прерывания"). В будущем, я смогу взглянуть на выпуск за январь 2009 года и сказать, что в нем я показал, как добавить поверх ThreadPool поддержку циклического доступа.

Проблема, которую требуется решить, в первую очередь требует понимания того, как работает подготовка к отправке в ThreadPool. Внутренне, он поддерживает очередь переданных ему рабочих элементов. Когда поток в пуле доступен для исполнения работы, он возвращается к этой очереди и вытягивает из нее следующий элемент. Порядок, в котором происходит эта обработка, не задокументирован и на него совершенно точно не следует полагаться (поскольку он может измениться и, скорее всего, изменится в будущих версиях.

Сейчас он реализован очень просто: очередь «первым вошел – первым вышел» (first-in-first-out – FIFO). Таким образом, первая работа, поставленная в очередь, будет первой работой, которую подберет поток. Для приведенного выше сценария пакетов, это значит, что вся работа из первого пакета будет в очереди впереди всей работы из второго пакета. Таким образом, вся работа из первого пакета будет подготовлена к отправке до второго пакета. В некоторых случаях это нормально и оптимально. В данном случае, необходим больший контроль над происходящим.

Одним из простейших способов получения этого контроля над ThreadPool является замена того делегата, исполнения которого желает пользователь, на собственный. В качестве примера, нам, скажем, нужно улавливать все необработанные исключения, выброшенные из запрошенной работы и создавать события для каждого. Чтобы сделать это, можно написать код, подобный показанному на рис. 1. Затем, вместо использования ThreadPool.QueueUserWorkItem, следует использовать ExceptionThreadPool.QueueUserWorkItem. Работа все так же будет исполнена ThreadPool, но потоки пула будут исполнять результат, установленный в очередь автором кода, а не тот, что предоставил пользователь. Вызов делегата затем вызовет представленный пользователем делегат, улавливая все исключения и создавая целевое событие.

Рис. 1. Добавление оболочки к ThreadPool

public static class ExceptionThreadPool {
  public static void QueueUserWorkItem(
      WaitCallback callback, object state) {
    ThreadPool.QueueUserWorkItem(delegate {
      try { callback(state); }
      catch (Exception exc) {
        var handler = UnhandledException;
        if (handler != null) 
          handler(null, 
            new UnhandledExceptionEventArgs(exc, false));
      }
    });
  }

  public static event 
    UnhandledExceptionEventHandler UnhandledException;
}

Отметьте, что хотя эта методика и эффективна у нее есть своя цена: дополнительный делегат нужно разместить, дополнительный делегат нужно вызвать и так далее. Являются ли издержки на это чрезмерными может определить только сам разработчик и его сценарии, но создание уровней такого рода обычно экономнее написания собственного пула потоков с нуля.

Это, само собой, очень простой пример, но можно делать и более сложные вещи. Пример пула расстановки приоритетов, на который я сослался выше, сохраняет предоставленные пользователем делегаты в собственных структурах данных. Затем он помещает в очередь пула подставной делегат, который возвращается и обыскивает эти структуры данных на предмет нужного делегата для исполнения, предпочитая сперва исполнять те, у которых выше приоритетность. Подобную методику можно применить в описанной ситуации с пакетами.

Представьте себе, на секунду, что вместо одной очереди имеется одна очередь на пакет. Каждый пакет будет помещать работу в относящуюся у нему очередь. После этого потоки пула используются для произведения циклического доступа ко всем очередям. Делегат, устанавливаемый в очередь к реальному ThreadPool возвращается в структуры данных и ищет работу, начиная со следующей очереди, которую предстоит изучить. Если таковая обнаруживается, он исполняется из этой очереди. Если нет, он ищет следующую очередь.

Таким образом можно предоставить равномерное исполнение пакетов. Если пакет только один, потоки всегда будут извлекать работу из этой одной очереди. Если пакетов несколько, потоки будут посещать каждую из очередей, уделяя им примерно равное внимание. Рис. 2 предоставляет обзор того, как будет выглядеть это решение.

netmatters,fig02.gif

Рис. 2. Подход RoundRobinThreadPool (циклического доступа к пулу потоков)

Чтобы запустить все, сперва необходима структура данных для хранения предоставляемого пользователем делегата. Мое представление показано на Рисунке 3. Эта структура данных содержит три свойства. Два первых должны выглядеть знакомо; это состояние предоставляемое пользователем QueueUserWorkItem, так что естественно его нужно отправить в кэш. Третье свойство, однако, может быть незнакомо. С каждым потоком исполнения в .NET связан System.Threading.ExecutionContext, представляющий такую информацию, как текущего пользователя, состояние, связанное с логическим потоком исполнения, информацией безопасности доступа кода и так далее. Важно, что этот контекст проходит через асинхронные точки исполнения.

Рис. 3. Запись рабочего элемента

internal class WorkItem {
  public WaitCallback WaitCallback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = state => {
    var item = (WorkItem)state;
    item.WaitCallback(item.State);
  };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else WaitCallback(State);
  }
}

Например, в случае олицетворения конкретного удостоверения Windows и вызова ThreadPool.QueueUserWorkItem, поставленная в очередь работа должна выполниться от имени того же удостоверения Windows. Если она не выполняется, то это потенциальная брешь в безопасности. Начиная с .NET Framework 2.0, этот контекст, по умолчанию, автоматически проходит через все асинхронные точки в коде: ThreadPool.QueueUserWorkItem, создание нового потока, асинхронный вызов делегата и так далее.

Однако, в случае приведенной здесь реализации игра идет по другому набору правил. Изменение порядка, в котором делегаты устанавливаются в очередь, по сравнению с порядком их исполнения, устраняет прямое соответствие между этим процессом ExecutionContext и рабочими элементами, предоставляемыми пользователями. В таковом качестве, реализации необходимо верно кэшировать ExecutionContext с помощью предоставленного пользователем делегата и затем использовать записанный контекст для выполнения этого делегата.

Теперь, когда получен рабочий элемент, давайте взглянем на очередь в которой он будет содержаться (показана на рис. 4). Сама структура данных RoundRobinThreadPool.Queue довольно проста. Внутри она содержит Queue<WorkItem> для хранения всех предоставленных ей рабочих элементов, ссылку на экземпляр RoundRobinThreadPool с которым связана очередь и логическое значение, отмечающее, был ли метод Dispose вызван на очереди. Затем она предоставляет методы QueueUserWorkItem, с той же подписью, что и у ThreadPool.

Рис. 4. Циклическая очередь

public sealed class RoundRobinThreadPool {  
  private List<Queue> _queues;
  ...

  public sealed class Queue : IDisposable {
    internal Queue(RoundRobinThreadPool pool) { _pool = pool; }

    internal Queue<WorkItem> _workItems = new Queue<WorkItem>();
    private RoundRobinThreadPool _pool;
    internal bool _disposed;

    public void QueueUserWorkItem(WaitCallback callback) { 
      QueueUserWorkItem(callback, null); 
    }

    public void QueueUserWorkItem(WaitCallback callback, object state) {
      if (_disposed) 
        throw new ObjectDisposedException(GetType().Name);
      var item = new WorkItem { 
        Context = ExecutionContext.Capture(), 
        WaitCallback = callback, State = state };
      lock (_pool._queues) _workItems.Enqueue(item);
      _pool.NotifyNewWorkItem();
    }

    public void Dispose() {
      if (!_disposed)  {
        lock (_pool._queues) {
          if (_workItems.Count == 0) 
            _pool.RemoveQueueNeedsLock(this);
        }
        _disposed = true;
      }
    }
  }
}

Когда вызывается QueueUserWorkItem, обратный вызов и состояние, предоставленное пользователем (вместе с текущим ExecutionContext) записываются в WorkItem. Работа затем сохраняется в универсальной очереди и нужны пул уведомляется о прибытии новой работы. Важно отметить, что для защиты очереди рабочих элементов используется блокировка, поскольку QueueUserWorkItem может быть вызван параллельно с нескольких потоков и необходимо гарантировать удержание инвариантов.

Отметьте, также, что блокируемым объектом является список глобальных очередей, исходящий из пула. Я использую одну довольно крупномасштабную блокировку для всей реализации. Более эффективная реализация скорее всего будет использовать более детальную блокировку, такую, как использование отдельных блокировок для очередей, вместо одной для всего RoundRobinThreadPool в целом. Для простоты и легкости в реализации, я решил использовать единственную блокировку.

Метод Dispose применяется, когда эта очередь больше не нужна. В типичном сценарии работы с пакетами, будет создана очередь, в нее размещена работа и затем очередь будет освобождена. Если бы метод Dispose просто удалял очередь из пула, это, вероятно, произошло бы, когда в ней еще оставались бы предназначенные для обработки элементы.

В таковом качестве, Dispose делает две вещи. Во-первых, он проверяет, остались ли в очереди какие-нибудь рабочие элементы. Если нет, очередь производит вызов в пул, чтобы удалить себя. Во-вторых, он помечает сам себя как освобожденный. Скоро можно будет увидеть, как пул ведет себя при столкновении с освобожденным пулом, который не был удален.

На рис. 5 показана оставшаяся часть реализации – самого класса RoundRobinThreadPool. Пул содержит четыре поля:

  • Список отдельных очередей, поддерживаемых пулом (который также служит в качестве ранее упомянутой блокировки).
  • Очередь по умолчанию для пула.
  • Целое число, представляющее следующую очередь, которая будет обыскана на предмет работы.
  • Делегат обратного вызова, реально устанавливаемый в очередь к ThreadPool.

Рис. 5. RoundRobinThreadPool

public sealed class RoundRobinThreadPool {
  private List<Queue> _queues;
  private Queue _defaultQueue;
  private int _nextQueue;
  private WaitCallback _callback;

  public RoundRobinThreadPool() {
    _queues = new List<Queue>();
    _callback = DequeueAndExecuteWorkItem;
    _nextQueue = 0;
    _defaultQueue = CreateQueue();
  }

  public Queue CreateQueue() {
    var createdQueue = new Queue(this);
    lock (_queues) _queues.Add(createdQueue);
    return createdQueue;
  }

  public void QueueUserWorkItem(WaitCallback callback)  { 
    QueueUserWorkItem(callback, null); 
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _defaultQueue.QueueUserWorkItem(callback, state);
  }

  private void RemoveQueueNeedsLock(Queue queue) {
    int index = _queues.IndexOf(queue);
    if (_nextQueue >= index) _nextQueue--;
    _queues.RemoveAt(index);
  }

  private void NotifyNewWorkItem() { 
    ThreadPool.UnsafeQueueUserWorkItem(_callback, null); 
  }

  private void DequeueAndExecuteWorkItem(object ignored) {
    WorkItem item = null;

    lock (_queues) {
      var searchOrder = 
        Enumerable.Range(_nextQueue, _queues.Count - _nextQueue).
        Concat(Enumerable.Range(0, _nextQueue));

      foreach (int i in searchOrder) {
        var items = _queues[i]._workItems;
        if (items.Count > 0) {
          item = items.Dequeue();
          _nextQueue = i; 
          if (queue._disposed && 
              items.Count == 0) RemoveQueueNeedsLock(_queues[i]);
          break;
        }
      }
      _nextQueue = (_nextQueue + 1) % _queues.Count;
    }
    if (item != null) item.Execute();
  }

  ... // RoundRobinThreadPool.Queue and .WorkItem, already shown
}

Когда RoundRobinThreadPool инициализируется, все это состояние настраивается. В частности, очередь по умолчанию настраивается путем вызова метода CreateQueue. Этот метод CreateQueue – тот же самый метод, который был предоставлен публично, чтобы позволить разработчику добавить еще одну очередь к пулу (например, когда прибывает пакет работы, требующий собственной изолированной очереди). Он просто создает новый экземпляр RoundRobinThreadPool.Queue (типа, рассмотренного на рис. 3), добавляет его в свой список очередей и возвращает его.

Для простоты использования, RoundRobinThreadPool предоставляет собственные методы QueueUserWorkItem; они просто нацеливаются на очередь по умолчанию где был создан экземпляр пула.

Далее рассмотрим метод NotifyNewWorkItem. Вспомним, что когда QueueUserWorkItem вызывался на очереди после сохранения рабочего элемента, очередь отправляла вызов этому методу NotifyNewWorkItem на пуле. Этот метод просто производит делегирование настоящему ThreadPool, отправляя делегата, который возвратится к методу DequeueAndExecuteWorkItem (который скорое будет рассмотрен чуть ниже), который, как верно предполагает его имя, выведет из очереди и исполнит элемент работы из циклического пула.

Отметьте, что NotifyNewWorkItem вызывает ThreadPool.UnsafeQueueUserWorkItem, а не ThreadPool.QueueUserWorkItem. Приставка "Unsafe" просто указывает, что это не передающееся ExecutionContext («Содержимое исполнения»); избежание этого улучшает производительность. А поскольку эта реализация уже исполняет ExecutionContext, для ThreadPool нет нужды делать то же самое.

Реальное волшебство происходит в DequeueAndExecuteWorkItem. Этот метод сперва создает порядок, в котором следует производить поиск по запросам. Порядок поиска идет от следующей очереди, которую следует рассмотреть к концу списка и затем возвращается обратно, стартуя в начале списка и идя до очереди, на которой он стартовал. Чтобы упростить реализацию, метод LINQ Enumerable.Range используются для создания двух списков, которые затем сцепляются вместе с помощью метода LINQ Enumerable.Concat.

После получения порядка поиска, он начинает отлавливать рабочие элементы. Каждая очередь рассматривается в указанном порядке, и как только рабочий элемент найден, он удаляется и следующий указатель обновляется. Рабочий элемент затем вызывается с помощью метода Execute, показанного на рис. 3.

Здесь имеется одна особенно интересная строка кода и это проверка на предмет того, является ли очередь из которой был извлечен элемент как освобожденной, так и пустой. Если пул находит такую очередь, он знает, что к ней больше не будет добавляться новых элементов (поскольку она была освобождена), так что ее больше не нужно сохранять. На этом этапе, RemoveQueueNeedsLock используется для удаления целевой очереди из списка очередей и потенциального обновления следующего указателя очереди, на случай, если он теперь вне допустимого диапазона.

Отметьте, что этот метод не использует блокировки внутренне, но оценивает общее состояние; поэтому я добавил в имя метода суффикс "NeedsLock", чтобы напомнить себе, что он должен быть вызван когда блокировка удерживается. Можно заметить, что обе точки вызова RemoveQueueNeedLock – одна в методе Dispose очереди и одна в методе DequeueAndExecuteWorkItem – вызывают его удерживая блокировку очередей.

Теперь, когда реализация завершена это можно протестировать в своем коде. В следующем примере я создал единственный статический экземпляр RoundRobinThreadPool. Когда прибывает пакет для обработки, создается новая очередь, вся работа устанавливается в нее и очередь освобождается:

 

private static RoundRobinThreadPool _pool = 
  new RoundRobinThreadPool();
...
private void ProcessBatch(Batch b) {
  using(var queue = _pool.CreateQueue()) {
    foreach(var item in b) {
      queue.QueueUserWorkItem(() => ProcessItem(item));
    }
  }
}

Хотя вся работа для первого прибывшего пакета запланирована к исполнению первой, как только второй пакет прибудет, он начнет получать примерно равную долю вычислительных ресурсов.

К этой реализации можно добавить еще примочек и, с точки зрения производительности, ее вероятно можно улучшить. Однако, уже очень малого объема кода было достаточно для создания абстракции, пользующейся .NET ThreadPool и всеми возможностями и функциями предоставляемыми им, но в то же время получающей поддержку искомого равномерного распределения работы между пакетами.

Вопросы и замечания направляйте Стивену по адресу netqa@microsoft.com.

Стивен Тауб (Stephen Toub) — старший руководитель программы в рабочей группе платформы параллельных вычислений Майкрософт. Он также является пишущим редактором журнала MSDN Magazine.