本文章是由機器翻譯。

.NET 很重要

排序與 ThreadPool 執行

Stephen Toub

問:多我的系統元件需要執行工作以非同步方式,使得我認為 Microsoft.NET Framework 的 ThreadPool 是正確的方案。不過,我有我認為是唯一的需求: 每個元件必須確定其工作項目的順序來處理,並的結果沒有兩個工作項目執行在同一時間。確定,不過,與彼此同時執行多個元件的 ; 事實的必要。您有任何建議嗎?

A 這不是為唯一一個困境,您可能認為發生許多重要的情況包括根據傳遞的訊息的下 。請考慮取得平行處理原則的好處藉由現用的任何一次管線的多個階段的一個管線實作。

舉例來說,您可能有管線會讀取來自資料檔案的資料、 壓縮它,加密,並將它從寫入新的檔案。在同一時間後的輸入到另一個需要輸出,可以完成壓縮與的加密同時而非相同的資料。而,壓縮常式可以壓縮一些資料,並將關閉它傳送到加密常式進行處理的點壓縮常式可以處理資料的下一個部分。

因為許多壓縮與加密演算法會維護該狀態會影響如何未來的資料會壓縮,和加密,很重要的維護順序。(never mind 這個範例會處理檔案,它會是不錯如果您無法解密並解壓縮結果,以取得回原始資料的所有正確的順序)。

有幾個可能的解決方案。第一個解決方案是只指定給每個元件的執行緒。這個 DedicatedThread 必須要執行的工作項目 」 和 「 單一執行緒,服務該佇列的先進第一個出 (FIFO) 佇列。當元件執行的工作時,它傾印到的佇列的工作,並最後執行緒會取得解決挑選工作來執行它。因為只有一個執行緒,只有一個項目就會執行一次。並且使用 FIFO 佇列時,工作項目將會被處理它們所產生的順序。

我在提供的範例在 1 月 2008.NET 很重要的資料行我使用簡單的 WorkItem 類別來表示將工作執行,如 [圖 1 ] 所示。DedicatedThread 使用這個 WorkItem 型別的實作如**[圖 2** ] 所示。大量的實作是一個單純的 BlockingQueue <T> 實作 (在.NET Framework 4.0 包含一個 BlockingCollection <T> 型別,會在更適合像這樣的實作)。DedicatedThread 的建構函式會直接建立在 BlockingQueue <t> 執行個體然後 spins,執行緒不斷地等待另一個項目在佇列中抵達,然後執行它。

[圖 1) 擷取工作項目

internal class WorkItem {
  public WaitCallback Callback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = s => {
    var item = (WorkItem)s;
    item.Callback(item.State);
 };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else Callback(State);
  }
}

[圖 2 DedicatedThread 實作

public class DedicatedThread {
  private BlockingQueue<WorkItem> _workItems = 
    new BlockingQueue<WorkItem>();

  public DedicatedThread() {
    new Thread(() => {
      while (true) { workItems.Dequeue().Execute(); }
    }) { IsBackground = true }.Start();
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _workItems.Enqueue(new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() });
  }

  private class BlockingQueue<T> {
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _gate = new Semaphore(0, Int32.MaxValue);

    public void Enqueue(T item) {
      lock (_queue) _queue.Enqueue(item);
      _gate.Release();
    }

    public T Dequeue() {
      _gate.WaitOne();
      lock (_queue) return _queue.Dequeue();
    }
  }
}

這提供適用於您案例的基本功能,和它可能符合需求,但有一些重要的 downsides。先,執行緒會被保留每個元件。與一或兩個的元件,可能無法發生問題。但元件許多的這可能導致一個嚴重的爆炸,執行緒的數目。可能會導致不正確的效能。

此特定實作不也非常強大。例如,要是您想要拆元件 — 您要如何知道執行緒停止封鎖?以及,如果擲回例外狀況從工作項目會發生什麼事?

為的另外,很有趣請注意本解決方案是類似於 Windows 所使用的資料的一般的訊息幫浦。訊息幫浦 (Message Pump) 會是等待訊息到達,迴圈,分派他們 (處理它們),然後回到並等候多個。在特定視窗訊息是單一執行緒處理。在 [圖 3 ,應該出現非常像 [圖 2 ] 中的程式碼的行為的程式碼,會示範,相似之處。新的執行緒是可以設定建立控制項,可確保已初始化其控制代碼,並使用 Application.Run 執行訊息迴圈。若要佇列工作項目至這個執行緒,您只會使用控制項的 begin­invoke 方法。請注意我不建議這的種方法,但而只是指標出的在高層次,是做為 DedicatedThread 方案已經顯示相同的基本概念。

[圖 3 的相似處與 UI 訊息迴圈

public class WindowsFormsDedicatedThread {
  private Control _control;

  public WindowsFormsDedicatedThread() {
    using (var mre = new ManualResetEvent(false)) {
      new Thread(() => {
        _control = new Control();
        var forceHandleCreation = _control.Handle;
        mre.Set();
        Application.Run();
      }) { IsBackground = true }.Start();
      mre.WaitOne();
    }
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _control.BeginInvoke(callback, state);
  }
} 

第二個解決方案會包含使用 ThreadPool 執行。而不是每服務私用佇列的元件的新的自訂執行緒的旋轉,我們會將元件,每一佇列,從相同佇列沒有兩個項目將過服務在同一時間。這是具有允許本身來控制需要多少執行緒,來處理其資料隱碼攻擊和退休,處理可靠性的問題,讓您的新的執行緒上不常是正確的事要旋轉的 ThreadPool 的優點。

本解決方案的實作如 [圖 4 ] 所示。fifo­execution 類別會維護只在兩個欄位: 要處理的工作項目的佇列 」 和 「 布林 (Boolean) 值,指出要求是否已經發給 ThreadPool,以處理工作項目。這兩個這些欄位都受到鎖定的工作項目清單上。實作其他部分是只要兩個方法。

[圖 4 實作 FifoExecution

public class FifoExecution {
  private Queue<WorkItem> _workItems = new Queue<WorkItem>();
  private bool _delegateQueuedOrRunning = false;

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    var item = new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() };
    lock (_workItems) {
      _workItems.Enqueue(item);
      if (!_delegateQueuedOrRunning) {
        _delegateQueuedOrRunning = true;
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
      }
    }
  }

  private void ProcessQueuedItems(object ignored) {
    while (true) {
      WorkItem item;
      lock (_workItems) {
        if (_workItems.Count == 0) {
          _delegateQueuedOrRunning = false;
          break;
        }
        item = _workItems.Dequeue();
      }
      try { item.Execute(); }
      catch {
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
          null);
        throw;
      }
    }
  }
}

第一個方法是 QueueUserWorkItem,具有簽名碼符合 ThreadPool (ThreadPool 也提供接受只是一個 WaitCallback,多載,您可以選擇新增一個方便多載) 所公開的。方法會先建立 WorkItem,要儲存,然後會鎖定。(沒有共用的狀態時建立,WorkItem 存取。因此,以保持鎖定的越小越好,擷取此項目的是之前取得鎖定)。 一旦在鎖,建立的工作項目就會是到工作項目佇列的佇列。

在方法再檢查是否已經被要求 ThreadPool 處理佇列的工作的項目,如果其中一個尚未被做會讓這類的要求 (和筆記它供日後使用)。ThreadPool 這個要求是只使用其中一個 ThreadPool 的執行緒執行 ProcessQueuedItems 方法。

當叫用由 ThreadPool 執行緒,ProcessQueuedItems 就會進入迴圈。在這個的迴圈花鎖定,並同時保留鎖定,它檢查是否有任何處理更多的工作項目]。如果沒有任何,它重設要求旗標 (例如,未來的佇列的項目將會處理從集區重新要求) 並結束。如果要處理的工作項目它會抓取下一個,釋放鎖定,執行的處理然後啟動所有超過一次,執行,直到佇列中有沒有更多的項目。

這是一個簡單但強大實作。元件可能會立即建立 FifoExecution 的執行個體並用它來排程的工作項目。每個體 FifoExecution 只有一個佇列的工作項目將可以在一個的階段執行,並佇列的工作項目會執行這些已排入佇列的順序。此外,請工作的執行個體能夠同時執行的不同 FifoExecution 的項目。而最棒的是您現在的執行緒管理的企業,離開所有硬碟 (但非常重要的) 工作執行緒管理的 ThreadPool 到。

在極端情況下,每個元件保留集區飽和與 ThreadPool 將可能斜道,一直到有一個執行緒,每元件的工作就像在原始 dedicated­thread 實作。但是,只會發生如果的認為適當的 ThreadPool。如果元件沒有保留集區飽和,必須需要較少的多執行緒。

有多的好處例如讓 ThreadPool 執行正確的動作方面與例外狀況。在 DedicatedThread 實作,如果怎樣的項目的處理程序擲回例外狀況?執行緒會來自向下,損毀,但根據應用程式的設定,處理程序可能不被破壞。在這種情況下工作項目將會啟動的 DedicatedThread 到佇列,但是沒有將曾經取得處理。與 fifo­execution,ThreadPool 只是最後會加入更多執行緒已離開的補償。

[圖 5 ] 顯示一個利用 FifoExecution 類別的簡單示範應用程式。在 [管線] 中,這個應用程式有三個階段。每個階段寫入目前的資料的識別碼它使用 (這是只在迴圈的反覆項目)。接著 (由以下一個 thread.SpinWait) 一些工作,並傳遞資料 (一次,只在迴圈反覆運算) 以及下一個階段。每個步驟輸出不同數目的索引標籤的資訊,這樣很容易看到分隔出的結果。您可以在 [圖 6 ] 所示的輸出中看到,每個階段 (欄) 讓工作的正確順序。

FifoExecution 的 [圖 5] 示範

static void Main(string[] args) {
  var stage1 = new FifoExecution();
  var stage2 = new FifoExecution();
  var stage3 = new FifoExecution();

  for (int i = 0; i < 100; i++) {
    stage1.QueueUserWorkItem(one => {
      Console.WriteLine("" + one);
      Thread.SpinWait(100000000);

      stage2.QueueUserWorkItem(two => {
        Console.WriteLine("\t\t" + two);
        Thread.SpinWait(100000000);

        stage3.QueueUserWorkItem(three => {
          Console.WriteLine("\t\t\t\t" + three);
          Thread.SpinWait(100000000);
        }, two);
      }, one);
    }, i);
  }

   Console.ReadLine();
}

fig06.gif

[圖 6 從示範應用程式的輸出

也是要注意的管線的階段之間的公平性缺乏有趣。您可以看到例如, [圖 6 ] 中的,stage1 是已經由反覆項目 21,stage2 仍然會傳回位於 13 和 stage3 位於 9 時。這是 ProcessQueuedItems 的主要是 ProcessQueuedItems 的因為我的實作。範例應用程式非常快速地發送 100 個工作項目至 stage1,和因此執行緒從集區服務 stage1 將可能是坐在 ProcessQueuedItems 迴圈中,並會不傳回之前還有沒有更多的 stage1 工作。這可讓它的不公平的偏差透過其他階段。如果您看到類似的問題在您的應用程式的問題,您可以增加藉由修改 ProcessQueuedItems 的實作更多像下列的其中一個階段之間的公平性:

private void ProcessQueuedItems(object ignored) {
  WorkItem item;
  lock (_workItems) {
    if (_workItems.Count == 0) {
      _delegateQueuedOrRunning = false;
      return;
    }
    item = _workItems.Dequeue();
  }
  try { item.Execute(); }
  finally {
    ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
      null);
  }
}

現在,即使有更多要處理的項目,ProcessQueuedItems 不會迴圈但而會以遞迴方式佇列本身 ThreadPool,因此排列本身優先順序後其他階段的項目。這個修改的情況從應用程式 [圖 5 ] 中現在輸出看起來的顯示在 [圖 7 )。您可以在這個新的輸出,排程會確實將 stage2 和 stage3 以比前更多的公平性中看到 (之間的階段仍是有一些延遲,但是其實會預期這是管線的)。

fig07.gif

[圖 7 與 Fairer 排程新的輸出

不用說沒有可用來自這個增加的公平性。現在,每個工作項目會帶來的額外的存取,透過將加入一些成本的排程。您必須決定是否在取捨,您可以為您的應用程式 ; 例如,您在您的工作項目中進行的工作在大量時這個額外負荷應該微不足道和 unnoticeable。

這是一個更多的範例的方式很可能會建立 ThreadPool 的頂端加入功能,而不必自行建立自訂的執行緒集區的系統。如其他需的範例請參魷 \ cs6 \ f1 \ cf6 \ lang1024先前的版本,在.NET Matters 的資料行MSDN Magazine .

您問題或意見寄至netqa@Microsoft.com.

Stephen Toub 將是您,Microsoft Parallel Computing Platform 小組為資深專案經理。他也是主筆 MSDN Magazine 的。