本文章是由機器翻譯。

.Net 相關問題

輪詢訪問 ThreadPool

Stephen Toub

 

問:我目前正在使用 Microsoft .NET Framework ThreadPool,在使用過程中我遇到了一種情況,不知道該如何解決。我需要處理一個較大批次的已在佇列中的工作項,在第一個批次開始處理後第二個批次(規模稍小)到達。最初,較大批次中的一些工作將被分配給 ThreadPool 中的所有工作執行緒。但是,當第二批到達後,我想平均分配,使每一批都得到同樣的服務,而不是先到的批次得到所有線程。

 

當其中一批完成時,我希望無論哪一個仍需處理的批次,都能夠獲得所有工作執行緒的關注。我該如何在 ThreadPool 頂層對此類批次處理功能進行分層呢?

答:在以前的專欄中,我曾介紹過如何在現有 .NET ThreadPool 的頂層對各種類型的功能進行分層。在 2004 年 10 月刊的《MSDN 雜誌》中,我介紹了如何使 ThreadPool 支援等待排隊的工作項(請參見“ThreadPoolWait 和 HandleLeakTracker”)。在 2004 年 11 月刊中,我介紹了如何添加對工作項優先順序的支援(請參見“ThreadPoolPriority 和 MethodImplAttribute”)。在 2006 年 3 月刊中,我介紹了如何添加取消支援(請參見“可以取消的執行緒池”)。將來,我可以回顧 2009 年 1 月這一期,並提到我曾介紹過如何在 ThreadPool 中添加輪詢調度支援。

您需要解決的問題首先是要瞭解 ThreadPool 分派工作的原理。在內部,它維護一個已被加入其佇列的工作項佇列。當池中的某個執行緒可以執行工作時,它會返回到此工作佇列並提取下一個專案。此處理順序並未記錄在案,因此是非常不可靠的(因為它可能而且很可能會在未來版本中有所變更)。

現在它可以通過一種非常簡單的方式來實現:先進先出 (FIFO) 佇列。因此,要排隊的第一個工作將成為被執行緒所選中的第一個工作。在您的批次處理情況中,這意味著第一批中的所有工作在佇列中都將排在第二批所有工作的前面。因此,第一批的所有工作都將在第二批的所有工作之前進行分派。對於某些情況這是合理且最佳的方法。而對於您的情況,還需要進行更多的控制。

從 ThreadPool 獲得這種控制的最簡單方法之一是將您自己的委託替換為使用者真正需要執行的委託。例如,假設您想捕獲已排隊工作中拋出的所有未處理異常並針對每個異常引發一個事件。要執行此操作,您可以編寫如圖 1 所示的代碼。然後,使用 ExceptionThreadPool.QueueUserWorkItem 而不是使用 ThreadPool.QueueUserWorkItem。此工作仍將由 ThreadPool 來執行,但池中的執行緒實際執行的是您排隊的委託,而非使用者提供的委託。接下來在調用您的委託時將會調用使用者提供的委託,而且會捕獲所有異常並引發目標事件。

圖 1 填充 ThreadPool

public static class ExceptionThreadPool {
  public static void QueueUserWorkItem(
      WaitCallback callback, object state) {
    ThreadPool.QueueUserWorkItem(delegate {
      try { callback(state); }
      catch (Exception exc) {
        var handler = UnhandledException;
        if (handler != null) 
          handler(null, 
            new UnhandledExceptionEventArgs(exc, false));
      }
    });
  }

  public static event 
    UnhandledExceptionEventHandler UnhandledException;
}

請注意,此方法雖然功能很強大,但卻需要付出一定的代價:需要分配額外的委託、需要調用額外的委託等等。 成本是否過高只能根據您和您所處的環境來確定,但與從頭編寫自己的執行緒池相比,這種分層通常更具成本效益。

當然,這是一個非常簡單的示例,但您也可以用來處理很複雜的工作。 我之前引用的優先順序池示例將使用者提供的委託存儲在它自己的資料結構中。 然後它將返回的替代委託在池中排隊並在這些資料結構中搜索要執行的正確委託,而且更傾向于首先執行優先順序較高的委託。 您可以使用類似的方法來解決您的批次處理困境。

您可以花些時間想像一下,如果不是只有單個佇列,而是每個批次都有一個佇列的情形。 各個批次會將工作置入其自身的相關佇列中。 然後,您可以使用池中的執行緒在所有佇列之間進行輪詢。 排列到實際 ThreadPool 佇列中的委託將返回到您的資料結構中,它會從下一個要檢查的佇列開始查找工作。 如果發現,則從該佇列開始執行。 否則,它將繼續查找下一個佇列。

通過這種方式,您可以在各個批次之間提供公平的調度。 如果只有一批,則執行緒將始終從該佇列提取工作。 如果有多批,則執行緒將訪問每個佇列,並給予大致相同的待遇。 圖 2為您提供了此解決方案的概觀。

netmatters,fig02.gif

圖 2 RoundRobinThreadPool 方法

為使工作進行下去,您首先需要一個資料結構來存儲使用者提供的委託。 我所採用的表現形式如圖 3 所示。 此資料結構包含三個屬性。 前兩個屬性您應該很熟悉;它們是使用者為 QueueUserWorkItem 提供的狀態,所以您理所當然需要對其進行緩存。 不過,第三個屬性可能會有些陌生。 在 .NET 中與每個執行執行緒都相關聯的是 System.Threading.ExecutionContext,它代表一些資訊,例如當前使用者、與邏輯執行執行緒相關聯的任何狀態、代碼訪問安全資訊等。 這種跨非同步執行點的上下文流動非常重要。

圖 3 捕獲工作項

internal class WorkItem {
  public WaitCallback WaitCallback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = state => {
    var item = (WorkItem)state;
    item.WaitCallback(item.State);
  };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else WaitCallback(State);
  }
}

例如,如果您要類比特定的 Windows 身份標識並調用 ThreadPool.QueueUserWorkItem,則您排隊的工作應使用相同的 Windows 身份標識來執行。 否則將會出現潛在的安全性漏洞。 從 .NET Framework 2.0 開始,此上下文會預設自動跨代碼中的所有非同步點流動:ThreadPool.QueueUserWorkItem、新建執行緒、非同步委託調用等。

但是對於此處討論的實現,您應遵守一組不同的規則。 通過從委託的執行順序更改其排隊順序,此 ExecutionContext 流與使用者提供的工作項之間不再存在直接的對應關係。 因此,您的實現需要正確緩存 ExecutionContext 與使用者提供的委託,然後使用捕獲的上下文來運行該委託。

現在,您已經有了一個工作項,讓我們來看一看它將被放入的佇列(如圖 4 所示)。 RoundRobinThreadPool.Queue 資料結構本身相當簡單。 在內部,它包含一個用來存儲為其提供的所有工作項的 Queue<workitem>、一個對所關聯的 RoundRobinThreadPool 實例的引用,以及一個表示 Dispose 方法是否在佇列中被調用過的布林值。 然後,它會為 QueueUserWorkItem 方法提供與 ThreadPool 相同的簽名。

圖 4 輪詢佇列

public sealed class RoundRobinThreadPool {  
  private List<Queue> _queues;
  ...

  public sealed class Queue : IDisposable {
    internal Queue(RoundRobinThreadPool pool) { _pool = pool; }

    internal Queue<WorkItem> _workItems = new Queue<WorkItem>();
    private RoundRobinThreadPool _pool;
    internal bool _disposed;

    public void QueueUserWorkItem(WaitCallback callback) { 
      QueueUserWorkItem(callback, null); 
    }

    public void QueueUserWorkItem(WaitCallback callback, object state) {
      if (_disposed) 
        throw new ObjectDisposedException(GetType().Name);
      var item = new WorkItem { 
        Context = ExecutionContext.Capture(), 
        WaitCallback = callback, State = state };
      lock (_pool._queues) _workItems.Enqueue(item);
      _pool.NotifyNewWorkItem();
    }

    public void Dispose() {
      if (!_disposed)  {
        lock (_pool._queues) {
          if (_workItems.Count == 0) 
            _pool.RemoveQueueNeedsLock(this);
        }
        _disposed = true;
      }
    }
  }
}

調用 QueueUserWorkItem 時,使用者提供的回檔和狀態(以及當前的 ExecutionContext)將被捕獲到 WorkItem 中。 此工作隨後將被存儲到泛型佇列中,並且相關池會得到新工作已到達的通知。 您務必要注意到這裡使用了一個鎖來保護工作項佇列,因為 QueueUserWorkItem 可以同時從多個執行緒進行調用,而您需要確保維持不變性。

還要注意,被鎖定的物件是一個來自池的全域佇列清單。 對於整個實現,我使用的是一個粗細微性鎖。 更有效的實現可能會使用更細細微性的鎖定,例如針對每個佇列使用單個鎖,而不是整個 RoundRobinThreadPool 使用一個。 為了便於實現和出於方便考慮,我選擇使用單一鎖。

當不再需要此佇列時會使用 Dispose 方法。 在典型的批次處理方案中,將會創建一個佇列、將工作排列到該佇列中,然後釋放佇列。 如果 Dispose 方法只是將此佇列從池中刪除,那麼即使在其中仍有要處理的工作項,該佇列仍可能會被刪除。

這時 Dispose 會做兩件事情。 首先,它會檢查是否存在任何剩餘的工作項。 如果不存在,佇列會調用池以將自身刪除。 其次,它將自身標記為已釋放。 您稍後會看到當池遇到已釋放但尚未刪除的池時,它是如何處理這一情況的。

圖 5 顯示實現的其餘部分,即 RoundRobinThreadPool 類本身。 該池包含四個欄位:

  • 池(同時還作為先前提及的鎖)所維護的各個佇列的清單。
  • 池的預設佇列。
  • 代表要在其中搜索工作的下一個佇列的整數。
  • 已實際排列到 ThreadPool 中的回檔委託。

圖 5 RoundRobinThreadPool

public sealed class RoundRobinThreadPool {
  private List<Queue> _queues;
  private Queue _defaultQueue;
  private int _nextQueue;
  private WaitCallback _callback;

  public RoundRobinThreadPool() {
    _queues = new List<Queue>();
    _callback = DequeueAndExecuteWorkItem;
    _nextQueue = 0;
    _defaultQueue = CreateQueue();
  }

  public Queue CreateQueue() {
    var createdQueue = new Queue(this);
    lock (_queues) _queues.Add(createdQueue);
    return createdQueue;
  }

  public void QueueUserWorkItem(WaitCallback callback)  { 
    QueueUserWorkItem(callback, null); 
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _defaultQueue.QueueUserWorkItem(callback, state);
  }

  private void RemoveQueueNeedsLock(Queue queue) {
    int index = _queues.IndexOf(queue);
    if (_nextQueue >= index) _nextQueue--;
    _queues.RemoveAt(index);
  }

  private void NotifyNewWorkItem() { 
    ThreadPool.UnsafeQueueUserWorkItem(_callback, null); 
  }

  private void DequeueAndExecuteWorkItem(object ignored) {
    WorkItem item = null;

    lock (_queues) {
      var searchOrder = 
        Enumerable.Range(_nextQueue, _queues.Count - _nextQueue).
        Concat(Enumerable.Range(0, _nextQueue));

      foreach (int i in searchOrder) {
        var items = _queues[i]._workItems;
        if (items.Count > 0) {
          item = items.Dequeue();
          _nextQueue = i; 
          if (queue._disposed && 
              items.Count == 0) RemoveQueueNeedsLock(_queues[i]);
          break;
        }
      }
      _nextQueue = (_nextQueue + 1) % _queues.Count;
    }
    if (item != null) item.Execute();
  }

  ... // RoundRobinThreadPool.Queue and .WorkItem, already shown
}

初始化 RoundRobinThreadPool 時,將會配置所有這些狀態。特別是,會通過調用 CreateQueue 方法來初始化預設佇列。此 CreateQueue 方法與公開的、允許開發人員向池中添加其他佇列(例如,當需要自己獨立佇列的新工作批次到達時)的方法完全相同。它只產生實體新 RoundRobinThreadPool.Queue 實例(圖 3 中探討的類型)、將其添加到佇列清單中,然後再返回它。

為了方便使用,RoundRobinThreadPool 公開了其自身的 QueueUserWorkItem 方法;這些方法只針對在池產生實體時創建的預設佇列。

下麵介紹 NotifyNewWorkItem 方法。您可能會記得,如果針對某個佇列調用了 QueueUserWorkItem,則在存儲工作項之後,該佇列會在池中調用此 NotifyNewWorkItem 方法。此方法只是委託給實際的 ThreadPool,它會提交一個將會返回給 DequeueAndExecuteWorkItem 方法(稍後介紹)的委託,顧名思義,此方法將會取消排隊並執行輪詢池中的工作項。

請注意,NotifyNewWorkItem 調用的是 ThreadPool.UnsafeQueueUserWorkItem,而不是 ThreadPool.QueueUserWorkItem。"Unsafe" 首碼只是意味著它不是流動 ExecutionContext;不這樣做可得到一些性能優勢。由於此實現已開始對 ExecutionContext 流進行手動處理,因此 ThreadPool 也不需要嘗試執行此操作。

DequeueAndExecuteWorkItem 是真正的奇妙所在。此方法首先生成佇列搜索順序。搜索順序從要檢查的下一個佇列開始一直到清單結束,然後再迴圈回來,從清單的起始處到佇列起始處。為了簡化實現,故使用 LINQ Enumerable.Range 方法生成了兩個清單,然後再使用 LINQ Enumerable.Concat 方法將其連接在一起。

確定了搜索順序後,即可開始查找工作項。每個佇列都會按指定的順序進行檢查,找到工作項後,即會將其刪除並更新下一個指標。然後會使用圖 3 中所示的 Execute 方法來調用此工作項。

此處有一行特別有趣的代碼,它用來檢查剛剛從中檢索到某個專案的佇列是否已釋放且為空。如果池發現了此類佇列,就會意識到不會再有任何專案添加到其中(因為它已被釋放),因此不需要再保留這個池。此時,將會使用 RemoveQueueNeedsLock 把目標佇列從佇列清單中刪除,並可能會更新下一個佇列指標以防止其超出範圍。

請注意,此方法沒有在內部使用鎖,而是訪問共用狀態;因此我用 "NeedsLock" 尾碼命名此方法,以提醒我自己在持有鎖時需要調用此方法。您會注意到,針對 RemoveQueueNeedLock 的兩個調用網站(一個在佇列的 Dispose 方法中,另一個在池的 DequeueAndExecuteWorkItem 方法中)在持有佇列鎖的時候都會調用此方法。

隨著此實現的完成,您現在可以在代碼中對此進行測試。在以下示例中,我創建了 RoundRobinThreadPool 的一個靜態實例。如果有一批工作到達並需要進行處理,這時將會創建一個新的佇列、將所有工作都排列到此佇列中,然後釋放佇列:

 

private static RoundRobinThreadPool _pool = 
  new RoundRobinThreadPool();
...
private void ProcessBatch(Batch b) {
  using(var queue = _pool.CreateQueue()) {
    foreach(var item in b) {
      queue.QueueUserWorkItem(() => ProcessItem(item));
    }
  }
}

即使會首先安排第一批到達的所有工作,但只要第二批到達,它也將會得到基本相同的處理資源。

可以在此實現中添加更多的點綴性功能,從性能角度來看,實現很可能會大為改觀。 但是,幾乎不需要任何代碼您即可創建一個抽象,從而利用 .NET ThreadPool 及其提供的所有能力和功能,而您仍可以獲得您所尋求的基於批次的公平性。

請將您想向 Stephen 詢問的問題和提出的意見發送至 netqa@microsoft.com

Stephen Toub 是 Microsoft 平行計算平臺團隊的一名高級專案經理。 他還是**《MSDN 雜誌》的特約編輯。