Este artigo foi traduzido por máquina.

Assuntos .NET

Solicitada a execução com ThreadPool

Stephen Toub

P muitos componentes em meu sistema precisam executar o trabalho de forma assíncrona, que faz eu achar que que o Microsoft .NET Framework ThreadPool é a solução certa. No entanto, tem o que eu acredito é um requisito exclusivo: cada componente precisa garantir que seus itens de trabalho são processadas em ordem e que, como resultado, não dois dos seus itens de trabalho são executados ao mesmo tempo. É CERTO, no entanto, para vários componentes executar simultaneamente uns com os outros; na verdade, que é desejado. Você tem recomendações?

A este não é como exclusivo um predicament como você pode achar que, como ele ocorre em uma variedade de cenários importantes, inclusive aquelas baseia a transmissão de mensagens. Considere uma implementação de pipeline que obtém os benefícios de paralelismo por ter vários estágios do pipeline ativo ao mesmo tempo.

Por exemplo, você poderia ter um pipeline que lê dados de um arquivo, compacta, criptografa e grava-check-out um novo arquivo. A compactação pode ser feita simultaneamente com a criptografia, mas não nos mesmos dados ao mesmo tempo, desde a saída de um precisa ser a entrada para o outro. Em vez disso, a rotina de compactação pode compactar alguns dados e enviá-lo off para a rotina de criptografia para ser processada, no ponto que a rotina de compactação pode trabalhar no seguinte dado.

Como muitos algoritmos de compactação e criptografia manter um estado que afeta futuros como dados é compactado e criptografado, é importante que a ordem é mantida. (Never mind que este exemplo lida com arquivos, e ele pode ser interessante se você pode descriptografar e descompactar os resultados para obter o original com todos os dados na ordem correta.)

Há várias soluções possíveis. A primeira solução é simplesmente dedicar um segmento para cada componente. Este DedicatedThread teria uma fila de FIFO (first-in-first-out de itens de trabalho a ser executada e um único segmento que atende a essa fila. Quando o componente tem trabalho a ser executado, ele Despeja que funcionam na fila, e eventualmente o thread obterá ao redor para separar o trabalho e executá-lo. Como existe apenas um thread, somente um item será executado uma vez. E como uma fila de FILA está sendo usada, os itens de trabalho serão ser processados na ordem em que foram gerados.

Com o exemplo fornecidos na a coluna janeiro de 2008 .NET Matters, Usarei uma classe de WorkItem simples para representar o trabalho a ser executada, mostrado na Figura 1 . Uma implementação de DedicatedThread que usa esse tipo de WorkItem é mostrada na Figura 2 . A maior parte da implementação está em um BlockingQueue naive <T> implementação (o .NET Framework 4.0 inclui um BlockingCollection <T> tipo que poderia ser uma melhor ajustar para obter uma implementação assim). O construtor de DedicatedThread simplesmente cria um BlockingQueue <t> instância, em seguida, transfere um segmento que continuamente aguarda a outro item chegar na fila e, em seguida, executa-lo.

Figura 1 captura um item de trabalho

internal class WorkItem {
  public WaitCallback Callback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = s => {
    var item = (WorkItem)s;
    item.Callback(item.State);
 };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else Callback(State);
  }
}

A Figura 2 DedicatedThread implementação

public class DedicatedThread {
  private BlockingQueue<WorkItem> _workItems = 
    new BlockingQueue<WorkItem>();

  public DedicatedThread() {
    new Thread(() => {
      while (true) { workItems.Dequeue().Execute(); }
    }) { IsBackground = true }.Start();
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _workItems.Enqueue(new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() });
  }

  private class BlockingQueue<T> {
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _gate = new Semaphore(0, Int32.MaxValue);

    public void Enqueue(T item) {
      lock (_queue) _queue.Enqueue(item);
      _gate.Release();
    }

    public T Dequeue() {
      _gate.WaitOne();
      lock (_queue) return _queue.Dequeue();
    }
  }
}

Isso fornece a funcionalidade básica para seu cenário e ele pode atender suas necessidades, mas existem algumas desvantagens importantes.Em primeiro lugar, um thread está sendo reservado para cada componente.Com um ou dois componentes, que pode não ser um problema.Mas para um lote de componentes, isso pode resultar em um detalhamento grave no número de segmentos.Que pode levar a um desempenho ruim.

Essa implementação específica não também extremamente eficiente.Por exemplo, o que acontece se você deseja subdividir um componente — como saber o thread para interromper o bloqueio?E o que acontece se uma exceção é gerada de um item de trabalho?

Como um aparte, é interessante observar que essa solução é semelhante ao Windows usa em uma bomba de mensagem típica.A transferência de mensagem é um loop aguardando as mensagens chegam, distribuindo-los (processá-los), em seguida, voltar e aguardando para obter mais informações.As mensagens de uma janela específica são processadas por um único segmento.As semelhanças são demonstradas pelo código na Figura 3 , que deve apresentar um comportamento muito semelhante ao código na Figura 2 .Um novo segmento é spun até que cria um controle, garante que a alça foi inicializada e usa Application.Run para executar um loop de mensagem.Para a fila um item de trabalho para esse segmento, você pode simplesmente usar Begin­Invoke método o controle.Observe que eu não estou recomendar essa abordagem, mas em vez disso, apenas apontador que, em um alto nível, ele é o mesmo conceito básico como a solução de DedicatedThread já mostrada.

Figura 3 semelhanças com um loop de mensagem da interface do usuário

public class WindowsFormsDedicatedThread {
  private Control _control;

  public WindowsFormsDedicatedThread() {
    using (var mre = new ManualResetEvent(false)) {
      new Thread(() => {
        _control = new Control();
        var forceHandleCreation = _control.Handle;
        mre.Set();
        Application.Run();
      }) { IsBackground = true }.Start();
      mre.WaitOne();
    }
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _control.BeginInvoke(callback, state);
  }
} 

Uma segunda solução envolve o uso ThreadPool para execução.Em vez de girando um segmento de novo e personalizado por um componente que atende a uma fila particular, irá manter apenas a fila por componente, que não dois elementos da mesma fila nunca serão atendidos ao mesmo tempo.Isso apresenta os benefícios de permitir que o próprio ThreadPool controlar quantos segmentos são necessários, para lidar com sua inclusão e aposentadoria, para lidar com problemas de confiabilidade e para obter você fora da empresa de girando os novos segmentos, que é raramente a coisa certa para fazer.

Uma implementação dessa solução é mostrada na Figura 4 .A classe fifo­Execution mantém apenas dois campos: uma fila de itens de trabalho para serem processados e um valor booleano que indica se uma solicitação foi emitida para o ThreadPool para processar itens de trabalho.Ambos esses campos são protegidos por um bloqueio na lista de itens de trabalho.O resto da implementação é simplesmente dois métodos.

A Figura 4 Implementando FifoExecution

public class FifoExecution {
  private Queue<WorkItem> _workItems = new Queue<WorkItem>();
  private bool _delegateQueuedOrRunning = false;

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    var item = new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() };
    lock (_workItems) {
      _workItems.Enqueue(item);
      if (!_delegateQueuedOrRunning) {
        _delegateQueuedOrRunning = true;
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
      }
    }
  }

  private void ProcessQueuedItems(object ignored) {
    while (true) {
      WorkItem item;
      lock (_workItems) {
        if (_workItems.Count == 0) {
          _delegateQueuedOrRunning = false;
          break;
        }
        item = _workItems.Dequeue();
      }
      try { item.Execute(); }
      catch {
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
          null);
        throw;
      }
    }
  }
}

O primeiro método é QueueUserWorkItem, com uma assinatura que corresponde ao que expostos pelo ThreadPool (ThreadPool também oferece uma sobrecarga de conveniência que aceita somente um WaitCallback, uma sobrecarga que você pode optar por adicionar).O método primeiro cria um WorkItem sejam armazenados e, em seguida, usa o bloqueio.(Nenhum estado compartilhado é acessado ao criar o WorkItem.Portanto, para manter o bloqueio o menor possível, esta captura do item é feita antes de executar o bloqueio.) Depois que o bloqueio for mantido, o item de trabalho criado é enfileirados em fila do item de trabalho.

O método, em seguida, verifica se uma solicitação foi feita para o ThreadPool para processar itens de trabalho na fila e, se um não tenha sido feito, faz uma solicitação (e anotações-lo para o futuro).Essa solicitação para o ThreadPool é simplesmente usar um dos segmentos do ThreadPool para executar o método ProcessQueuedItems.

Quando chamado por um thread ThreadPool, ProcessQueuedItems insere um loop.Esse loop, leva o bloqueio e, mantendo o bloqueio, ele verifica se há quaisquer itens de trabalho mais para serem processados.Se não existem qualquer, ela redefine o sinalizador de solicitação (de modo que futuras itens enfileirados irão solicitar processamento do pool novamente) e sai.Se não houver itens de trabalho para ser processado, ele captura que próxima, libera o bloqueio, executa o processamento e começa tudo novamente, a ser executado até que não mais itens na fila.

Isso é uma implementação simples mas eficiente.Um componente agora pode criar uma instância de FifoExecution e usá-lo para programar itens de trabalho.Por instância do FifoExecution, somente um item de trabalho na fila será capaz de executar uma vez, e itens de trabalho na fila executará na ordem em que foram enfileiradas.Além disso, funcionar itens diferente FifoExecution instâncias será capaz de executar simultaneamente.E a melhor parte é que você está agora fora da empresa do gerenciamento de segmento, deixando todo o trabalho difícil (mas muito importante) de gerenciamento de segmento para o ThreadPool.

No caso extremo, onde cada componente é manter o pool saturado com o trabalho, o ThreadPool será provavelmente rampa até ter um thread por componente, exatamente como na implementação Dedicated­Thread original.Mas o que acontecerá somente se ele é considerado apropriado, o ThreadPool.Se os componentes não manter o pool saturado, muitos segmentos de menos será necessários.

Existem benefícios adicionais, como permitir que o ThreadPool farão a coisa certa em relação às exceções.Na implementação DedicatedThread, o que acontece se o processamento de um item lança uma exceção de ferramentas de laboratório?O thread virão de travamento para baixo, mas dependendo da configuração do aplicativo, o processo pode não ser subdividido.Nesse caso, os itens de trabalho serão iniciado enfileiramento até o DedicatedThread, mas nenhum nunca irá obter processadas.Com fifo­Execution, o ThreadPool apenas terminarão adicionando mais segmentos para compensar para aqueles que passaram fora.

a Figura 5 mostra um aplicativo simples de demonstração que utiliza a classe FifoExecution.Este aplicativo tem três estágios de um pipeline.Cada estágio gravará a identificação do dado atual que ele trabalha com (que é apenas a iteração do loop).Ele faz algum trabalho (representado aqui por um Thread.SpinWait) e transmite dados (novamente, apenas a iteração de loop) junto para o próximo estágio.Cada etapa produz suas informações com um número diferente de guias para que seja fácil ver os resultados separados check-out.Como você pode observar na saída mostrada na Figura 6 , cada estágio (uma coluna) é manter o trabalho ordenado corretamente.

A Figura 5 demonstração de FifoExecution

static void Main(string[] args) {
  var stage1 = new FifoExecution();
  var stage2 = new FifoExecution();
  var stage3 = new FifoExecution();

  for (int i = 0; i < 100; i++) {
    stage1.QueueUserWorkItem(one => {
      Console.WriteLine("" + one);
      Thread.SpinWait(100000000);

      stage2.QueueUserWorkItem(two => {
        Console.WriteLine("\t\t" + two);
        Thread.SpinWait(100000000);

        stage3.QueueUserWorkItem(three => {
          Console.WriteLine("\t\t\t\t" + three);
          Thread.SpinWait(100000000);
        }, two);
      }, one);
    }, i);
  }

   Console.ReadLine();
}

fig06.gif

A Figura 6 saída de aplicativos de demonstração

Também é interessante observar que há uma falta de igualdade entre os estágios de pipeline.Você pode ver, por exemplo, que stage1 na Figura 6 é já até a iteração 21, enquanto stage2 está ainda novamente em 13 e stage3 está no 9.Isso ocorre basicamente devido a minha implementação do ProcessQueuedItems.O aplicativo de exemplo é muito rapidamente empurrar trabalho 100 itens para stage1 e, portanto, o thread do pool que atende a stage1 serão provavelmente sentar no loop ProcessQueuedItems e não retornará até que há não mais trabalho stage1.Isso confere a ela uma inclinação desleal sobre os estágios.Se você vir com comportamento similar em seu aplicativo, e é um problema, você pode aumentar igualdade entre os estágios, modificando a implementação de ProcessQueuedItems para uma mais semelhante a seguir:

private void ProcessQueuedItems(object ignored) {
  WorkItem item;
  lock (_workItems) {
    if (_workItems.Count == 0) {
      _delegateQueuedOrRunning = false;
      return;
    }
    item = _workItems.Dequeue();
  }
  try { item.Execute(); }
  finally {
    ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
      null);
  }
}

Agora, mesmo que haja mais itens a serem processadas, ProcessQueuedItems não loop, mas em vez disso será recursivamente fila próprio ThreadPool, priorizando propriamente dito, portanto, por trás de itens de outras fases.Com essa modificação, a saída do aplicativo na Figura 5 agora parece como mostrado na Figura 7 .Você pode ver nessa nova saída agendamento está realmente tratando stage2 e stage3 com mais igualdade que antes (ainda há alguns latência entre os estágios, mas isso é esperado considerando que este é um pipeline).

fig07.gif

A Figura 7 nova saída com agendamento Fairer

Claro, essa maior igualdade não vir gratuitamente.Cada item de trabalho agora gera uma viagem extra por meio do Agendador, que adiciona alguns custos.Você precisará decidir se esta é uma compensação que você pode fazer para seu aplicativo; por exemplo, se o trabalho que você está fazendo na seus itens de trabalho em todos os substancial, essa sobrecarga de deve ser irrisório e unnoticeable.

Isso é apenas um exemplo de mais de como é possível criar sistemas de ThreadPool que adiciona funcionalidade sem ter que criar pools de threads personalizado você mesmo.Para obter outros exemplos, consulteedições anteriores do .NET MatterscolunaMSDN Magazine .

Envie suas dúvidas e comentários paranetqa@microsoft.com.

Stephen Toub é gerente sênior de programas na equipe de plataforma de computação paralela da Microsoft.Ele também é editor colaborador para MSDN Magazine.