Este artigo foi traduzido por máquina.

Interessantes de mesmo nível

Um processamento de trabalho Peer-to-Peer aplicativo com o WCF

Matt Neely

Download do código disponível na Galeria de código do MSDN
Procure o código on-line

Este artigo discute:
  • Comunicação peer-to-peer básica
  • Criar o ponto de trabalho
  • Balanceamento de carga e compartilhamento de trabalho
  • Itens de trabalho de backup e restauração
Este artigo usa as seguintes tecnologias:
O WCF

Conteúdo

Comunicação de mesmo nível básico
O ponto de trabalho
Fazer o trabalho de trabalho
Ponto do usuário
Spectators
Compartilhamento de trabalho e de balanceamento de carga
Backup de WorkItem
Interessantes de mesmo nível

o grupo tenho uma parte em Microsoft permite tirar uma semana de nossas tarefas regulares para pesquisar um tópico. Recentemente eu decidiu gastar o meu semana pesquisando a rede de (P2P) ponto-a-ponto. Essa pesquisa inflado em um interesse permanente nesse novo paradigma de rede.

Um dos motivos ponto a ponto piqued que meu juros foi devido para um sistema de compilação documentação interna minha equipe é responsável. Temos autores escrever milhares de documentos de ajuda que precisam ser analisada, transformados, patch e compilado. Os documentos podem ser compilados em vários formatos de saída e em vários idiomas todas as noites. Para fazer essa feat, temos um server farm executando nosso software de compilação.

O mecanismo que usamos atualmente é sua arquitetura típica cliente-servidor. Trabalhos de criação são enfileirados em um servidor central e doled check-out conforme cria servidores tornam-se livre para processá-los. Ocasionalmente, encontrar um registro posterior de trabalhos no servidor central ou um servidor de compilação com comportamento inadequado. Muitos membros de equipe têm um interesse especial no status do sistema de compilação, mas o sistema é opaco e difícil de monitorar.

Este artigo é o resultado da concepção uma versão de ponto a ponto desse sistema de compilação. Discutirei o design emergente de um aplicativo de ponto habilitado baseia o PeerChannel do WCF (Windows Communication Foundation) e processamento de trabalho genérico. O vai orientarão a construção de sistema ao abordar alguns das decisões de projeto que devem ser feitas no processo.

Comunicação de mesmo nível básico

Para este projeto, começarei definindo meus colegas de processamento de trabalho principais. Irá dá-los o nome genérico de trabalhos. O plano estará com um trabalho por máquina física (embora não haja nenhum limitações técnicas exigir isso).

Cada trabalho precisará se comunicar e coordenar com outros usuários para realizar o trabalho. Para fazer isso, ele precisa ter algum conhecimento sobre outros trabalhos. Portanto, como o meu trabalho obterá essas informações?

Uma abordagem é com cada trabalho pesquisar cada trabalho para seu estado. Esse mecanismo é complexo, pois ele requer duas fases: descoberta de trabalho e consulta de trabalho. Além disso, se cada trabalho ping de cada outro operador, você teria muitas mensagens desnecessárias viajando sua rede. Junto com o fato de que o PeerChannel do WCF é inerentemente multicast, um trabalho único de ping será realmente enviar uma mensagem para cada trabalho. Para n colegas de trabalho, teria n3 mensagens em vôo ao redor. Eu recusado essa abordagem para um modelo de difusão.

Em um modelo de difusão, cada trabalho periodicamente enviará seu status. Eu chamo isso uma mensagem de pulsação. Outros trabalhos podem decidir o que fazer com essas informações, mesmo se eles apenas ignorá-la. Eu prefiro essa abordagem porque como a topologia de malha trabalho alterações, você pode inferir nós vindo online e offline pela presença ou ausência de mensagens de pulsação. Descoberta de mesmo nível de trabalho é passiva em vez de ativo. Esse modelo também minimiza o tráfego de rede necessário apenas para upkeep do sistema. Com n colegas de trabalho teria n2 mensagens nesse modelo, que é um aprimoramento a primeira abordagem.

Para criar um serviço no WCF, você geralmente comece com definindo seu contrato de serviço. Essa é a interface que define a funcionalidade de que um serviço expõe aos clientes. Em um cenário ponto a ponto, cada ponto age como servidor e cliente, e o contrato de serviço criado aqui definirá como pontos se comuniquem entre si. Dada a decisão para transmitir o status do trabalho, pode definir o contrato de serviço:

[ServiceContract]
public interface IWorkerPeer {
  [OperationContract(IsOneWay=true)]
  void Heartbeat(WorkerStatus status);
}

Meu contrato de serviço IWorkerPeer possui uma operação única e unidirecional chamada pulsação. Quando um trabalho chama esse método, ele enviará fora a instância do objeto WorkerStatus preenchida para cada outro ponto de trabalho participar meu malha.

Aqui é a definição simples da classe WorkerStatus:

[DataContract]
public class WorkerStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public WorkerState State;
}

Ele conterá um identificador exclusivo para o trabalho e um valor de enumeração de estado simples. Em um sistema mais funcional, isso pode ser um conjunto elaborado de dados incorporar descritores extras, contadores de desempenho, estatísticas de trabalho (como porcentagem de itens de trabalho com falha) e mais. Conforme definido, porém, isso minimamente permitirá cada trabalho saber o status de cada outro trabalho na malha.

Os estados que defini são desconhecido, se, ocupado e off-line. Um estado desconhecido significa que eu não tenho certeza de que do estado do trabalho (talvez devido à ausência de uma pulsação desse trabalho em um período especificado de tempo). Um estado pronto significa que o trabalho está funcionando corretamente e está pronto para executar no algum trabalho. Um ocupado estado significa que o trabalho está funcionando corretamente, mas ocupada fazendo o trabalho. Um estado off-line significa que se o trabalho não está disponível para funcionam. Isso pode ser potencialmente o estado enviado check-out final pulsação de um trabalho ou talvez um trabalho é colocado off-line por um administrador para manutenção ou depurar.

O ponto de trabalho

Para tornar um aplicativo de mesmo nível funcional do trabalho, preciso expor e consumir meu serviço. Portanto, CRIEI um aplicativo de console simples e deu a ele uma classe denominada trabalho conforme mostrado na Figura 1 . Em minha classe de trabalho, adicionei métodos de início e parada para gerenciar a instalação e a desmontagem do serviço exposto (que recebe mensagens) bem como o canal do cliente (para o envio de mensagens). Eu também usar uma instância de System.Threading.Timer para periodicamente chamar o método SendHeartbeat. SendHeartbeat instancia um objeto WorkerStatus, preenche com os dados adequados e, em seguida, envia para todos os pontos. Como um serviço, quando o método IWorker.Heartbeat é chamado, ele simplesmente resultará o conteúdo da mensagem para a saída padrão.

Figura 1 A classe de trabalho

namespace WorkerPeerApplication {
  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]

  public class Worker : IWorker {
    private ServiceHost _host;
    private ChannelFactory<IWorker> _factory;
    private IWorker _workerClientChannel;

    private Guid _id;
    private volatile WorkerState _state;

    private Timer _heartbeat;

    public Worker() {
      _id = Guid.NewGuid();
      _state = WorkerState.Offline;
    }

    public void Start()  {
      _host = new ServiceHost(this);
      _host.Open();

      _factory = new ChannelFactory<IWorker>("IWorkerClient");
      _workerClientChannel = _factory.CreateChannel();

      _state = WorkerState.Ready;
      _heartbeat = new Timer(
        this.SendHeartbeat,
        null,
        new TimeSpan(0),
        //Time should really be read in from config.
        new TimeSpan(0, 0, 1)); 
    }

    public void Stop() {
      _state = WorkerState.Offline;
      _heartbeat.Dispose();
      this.SendHeartbeat(null);
      _factory.Close();
      _host.Close();
    }

    private void SendHeartbeat(object context) {
      WorkerStatus ws = new WorkerStatus();
      ws.WorkerId = _id;
      ws.State = _state;

      _workerClientChannel.Heartbeat(ws);
    }

    void IWorker.Heartbeat(WorkerStatus status) {
      Console.WriteLine(
        "{0}: {1}", 
        status.WorkerId, 
        status.State);
    }
  }
}

Para a conclusão, eu preciso de um método Main e a configuração WCF apropriada, o que coloco em arquivo de configuração do aplicativo. O método Main pode ser visto no download de código, e a configuração de que uso é mostrada na Figura 2 .

Configuração do trabalho a Figura 2

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="WorkerPeerApplication.Worker">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint           
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

    <client>
      <endpoint
        name="IWorkerClient"
        address="net.p2p://MyPeerApplication"
        binding="netPeerTcpBinding"
        bindingConfiguration="unsecureBinding"
        contract="PeerWcfComm.IWorker" />
    </client>

  </system.serviceModel>
</configuration>

Você pode ver que eu defino tanto o serviço que expõe o elemento de mesmo nível e configuração do elemento de mesmo nível lado do cliente. A configuração do serviço Especifica o tipo do serviço bem como seu endereço base e o ponto de extremidade. ESTOU usando uma ligação de não-segura e o mesmo nível nomes resolução protocolo PNRP () para resolução de nomes de mesmo nível. A configuração do cliente é quase um espelho do serviço. A diferença notável somente é que eu deu o ponto de extremidade do cliente um nome para que o construtor de ChannelFactory possa acessar facilmente as configurações desejadas.

Agora que eu tem o shell de um ponto de trabalho, vamos teste unidade-lo. Para uma experiência, executar duas instâncias do programa ao mesmo tempo. Você verá que, inicialmente cada ponto resultará suas próprias informações de pulsação. Após alguns segundos, pontos começará a encontrar uns aos outros e cada trabalho receberá o status dos outros.

Francamente, eu considero ele irritantes e está distraindo para minhas mensagens para ser enviadas para mim. Felizmente, o Blog da equipe PeerChannelFornece o código que preciso. Você verá uma classe bastante útil chamada RemoteOnlyPropagationFilter. Emprestando isso, POSSO, em seguida, adicionar as duas linhas de código a seguir para meu método Worker.Start para garantir que as mensagens que enviam também não acabam sendo enviadas para mim:

PeerNode pn = ((IClientChannel)_clientChannel).GetProperty<PeerNode>();
pn.MessagePropagationFilter = new RemoteOnlyMessagePropagationFilter();

Estas duas linhas de código foram adicionadas antes de começar o timer de pulsação.

Fazer o trabalho de trabalho

Até agora, eu tenho um aplicativo que se comunica apenas entre os colegas de trabalho. Os operadores, na verdade, não fazem qualquer trabalho ainda. Irá corrigir isso, aumentando a interface IWorker, adicionando o seguinte método:

[OperationContract(IsOneWay = true)]
void DoWork(Guid workerId, WorkItem work);

Esse método permite que uma entidade externa informar um operador de mesmo nível para fazer um item de trabalho. Desde o WCF de mensagens são multicast, adicionei um parâmetro para especificar a identificação do trabalho que deverá executar o trabalho. FALAREI sobre a classe de WorkItem em alguns instantes. Mas por ora, a alteração de interface requer-me para atualizar a classe de trabalho, conforme mostrado na Figura 3 . Quando alguém diz um trabalho para fazer algum trabalho, essa solicitação é adicionada a uma fila interna e eventualmente executada na ordem recebida. Para realmente executar o trabalho, adicionei um método chamado MonitorWork para a classe de trabalho. O método Start simplesmente chamará ThreadPool.QueueUserWorkItem, apontando-lo para o método MonitorWork.

A Figura 3 revisão trabalho

public class Worker : IWorker {
  // Omitted for clarity ...
  private Queue<WorkItem> _workQueue;

  public Worker() {
    // Omitted for clarity ...
    _workQueue = new Queue<WorkItem>();
  }

  public void Start() {
    // Omitted for clarity ...
    ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void MonitorWork(object context) {
    if (_workQueue.Count > 0) {
      lock (_workQueue)
        _currentWorkItem = _workQueue.Dequeue();

      DoWorkInternal(_currentWorkItem);
      _currentWorkItem = null;
    }

    if (_state != WorkerState.Offline)
      ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void DoWorkInternal(WorkItem wrk) { ...}

  void IWorker.DoWork(Guid workerId, WorkItem work) {
    if (workerId == _id)
      lock (_workQueue)
        _workQueue.Enqueue(work);
  }
}

Em um alto nível, MonitorWork irá fazer alguma coisa e, em seguida, reagendar si por meio de outra chamada para ThreadPool.QueueUserWorkItem. Ele fará isso até que o estado do trabalho esteja offline, o que acontece quando parar é chamado no objeto de trabalho. Algo acontece é verificar se a fila de trabalho possui nada nela. Se a fila contém itens de trabalho, MonitorWork será dequeue a próxima WorkItem e começar a processá-lo. O comportamento que quero é meu trabalho processar em série seu trabalho, embora ele poderia facilmente ser modificado para processar os itens em paralelo.

Como você ver a partir do código na Figura 3 , não resolveu o trabalho ainda. Eu simplesmente tenha passado isso para o método de DoWorkInternal ainda -para-ser-definido. Portanto, primeiro vamos observar a definição de classe de WorkItem. Como o sistema que eu estou criando para este artigo é a tarefa não reconhece, vou simplesmente definir o meu item de trabalho da seguinte maneira:

[DataContract]
public class WorkItem {
  [DataMember]
  public Guid WorkItemId { get; set; }

  [DataMember]
  public int PretendToWorkSeconds { get; set; }
}

Eu novamente deve usar um atributo DataContract ou MessageContract de forma que o WCF corretamente pode serializar minha classe personalizada. A classe de WorkItem tem um campo identificador, WorkItemId e um campo especificando o número de segundos para feign fazendo o trabalho real (chamado PretendToWorkSeconds).

Agora que um WorkItem é definida, pode processá-lo. Esta é minha implementação do método DoWorkInternal:

private void DoWorkInternal(WorkItem wrk) {
  this._state = WorkerState.Busy;
  Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));
  this._state = WorkerState.Ready;
}

Isso é onde eu normalmente faria meu trabalho significativo (por exemplo, Criando a documentação de instalação do SQL Server 2008 em português (Brasil)). Mas, por questão de neste artigo, simplesmente vou no estado de suspensão do período de tempo especificado.

Finalmente, eu atualizar minha classe WorkStatus com dois novos membros de dados que serão definidos antes de pulsação de um trabalho é enviada. Eles são o identificador do item de trabalho que está sendo executado no momento e uma contagem dos itens de trabalho pendentes nesse ponto de trabalho:

[DataContract]
public class WorkerStatus {
  // Omitted for clarity...

  [DataMember]
  public Guid CurrentWorkItem;

  [DataMember]
  public int PendingWorkItemCount;
}

Agora que meus colegas de trabalho possui alguns dados, eu preciso conhecer acutely cenários que coloque sobre perda de dados. Uma das vantagens e desvantagens de um paradigma de rede descentralizada como ponto a ponto é que existe é não repositório central de dados. Cada ponto é responsável por coletivamente preservando dados do sistema. Portanto, o que acontece se um ponto de trabalho falhar? Especificamente para o meu sistema, o que acontece com o WorkItems na fila?

Cenários de desligamento de mesmo nível agrupar em duas categorias: esperados e inesperados. Para desligamentos uma natureza esperado, preciso lidar com o WorkItems na fila. O paradigma para lidar com esses itens pode variam de acordo com a natureza do aplicativo e dados. Se os dados não não importantes, talvez possa para simplesmente descartar todos os WorkItems na fila. Se o aplicativo é projetado para ser colocada em pausa (ou seja reiniciado automaticamente como um serviço do Windows), talvez os itens para o disco de serialização e lê-los durante a inicialização é a solução correta. Ainda outra opção é simplesmente encaminhar o WorkItems excedente para outros trabalhos antes para desligamento. FALAREI sobre desligamento inesperado neste artigo no contexto do backup de dados.

Ponto do usuário

Até agora, eu tenho um aplicativo habilitado ponto que pode aceitar e processar os itens de trabalho definido. Mas quem cria esses itens de trabalho e as envia para os pontos de trabalho? Um dos recursos poderosos de um sistema ponto a ponto é que qualquer pessoa pode conectar à infra-estrutura comunicação mesmo. Tudo o que é necessário para participar (para não-seguras malhas) é o conhecimento do endereço de malha e o contrato de serviço. Demonstrarei isso adicionando um novo tipo de ponto chamado um ponto de usuário.

A classe para o ponto de usuário é semelhante para o ponto de trabalho discutido anteriormente. O ponto de usuário também irá implementar a interface IWorker, mas não precisa fornecer uma implementação do método DoWork. (Lembre-se pensar ServiceContract interfaces como um contrato de comunicação e não como uma declaração de identidade. Um usuário não é um trabalho, mas ele participa da malha de mesmo nível IWorker e, portanto, implementa a interface.)

O ponto de usuário é interessado em controle de trabalhos, bem como criar novos itens de trabalho e dando-lhes para trabalhos. a Figura 4 mostra um screen dump do aplicativo de usuário. Este aplicativo lista as informações recebidas de mensagens de status do trabalho. No entanto, a funcionalidade que eu quero enfatizar é a criação de WorkItems e, em seguida, atribuí-los para trabalhos.

fig01.gif

A Figura 4 O aplicativos de mesmo nível de usuário

Uma decisão de design precisa ser feita em relação ao como WorkItems são atribuídos a trabalhos. Um método seria dependem da natureza de difusão seletiva do PeerChannel e transmitir uma mensagem para todos os trabalhos de dizer que um novo WorkItem estava disponível. O primeiro trabalho responder se ele está pronto e capazes de tirá-lo vence o direito de ter o novo WorkItem.

Eu não gosto dessa abordagem por vários motivos. Em primeiro lugar, o protocolo de comunicação seria bastante chatty. O usuário deve enviar uma mensagem de "que deseja que um item de trabalho". Trabalhos amenable devem responder com uma mensagem de " irá assumir-". O usuário teria que terminar até com uma mensagem de "fazer o trabalho" para o trabalho vencedor. Também (devido à assíncrona natureza de vários pontos de interação), deve ser dada atenção cuidadosa a todas as condições de corrida potencial que poderiam ocorrer.

Em segundo lugar, seria preciso definir o que significa "pronta e capaz de" para um trabalho. Se todos os trabalhos estão ocupados, não eles respondem? O que um usuário faz se nenhuma resposta é recebida?

Em vez deste método de anúncio de trabalho, eu estou escolher reutilizar as informações fornecidas já de pulsações do trabalho. Essas informações me oferece conhecimento suficiente para calcular o trabalho deve receber um WorkItem recém-criado. Portanto, o algoritmo para escolher o trabalho vencedor não residem no código do trabalho e pode ser alterado independentemente do trabalho. Para simplificar, vou escolher o trabalho para enviar que um WorkItem base os membros de WorkerState, CurrentWorkItem e PendingWorkItemCount dos objetos WorkerStatus. Eis o código por trás do botão criar trabalho:

private void btnNewWork_Click(object sender, EventArgs e) {
  WorkItem wrkItem = new WorkItem();
  wrkItem.WorkItemId = Guid.NewGuid();
  wrkItem.PretendToWorkSeconds = _rnd.Next(15);

  string nextWorker = GetNextWorker();
  _clientChannel.DoWork(nextWorker, wrkItem);

  lock (_workers)
    _workers[nextWorker].PendingWorkItemCount++;
}

Um novo, identificado exclusivamente WorkItem é gerado com um horário selecionado aleatoriamente entre 0 e 15 segundos (_rnd é um campo de nível de classe do tipo aleatório). EU decidir o próximo trabalho para enviar o trabalho para, enviar o trabalho. Eu também manualmente incrementar minha versão em cache de PendingWorkItemCount do trabalho. Isso ajuda a evitar situações em que dois ou mais trabalhos têm as mesmas estatísticas. Se o fizerem, é possível gerar o trabalho mais rápido do que trabalhos enviam status pulsações. Portanto, VOU ajustar Meus estatísticas de trabalho em cache e não unfairly enviar vários itens de trabalho para o mesmo trabalho.

A decisão para selecionar o próximo trabalho recebe o trabalho é a finalidade do método GetNextWorker mostrado aqui:

private WorkerStatus GetNextWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);

  return workers
    .Where<WorkerStatus>(
      w => w.State != WorkerState.Offline
      && w.State != WorkerState.Unknown)
    .OrderBy<WorkerStatus, int>(w => w.PendingWorkItemCount)
    .FirstOrDefault<WorkerStatus>();
}

Graças à novos métodos de extensão nas bibliotecas Microsoft .NET Framework 3.5, eliminando trabalhos nos estados offline e desconhecidos e de encomendas por seus contagens de itens de trabalho pendente é fácil. Uma instrução única (embora wordy) poderá fazer ambos. Eu também retornar o primeiro item na enumeração. Usar o método FirstOrDefault garante que a chamada tiver êxito, mesmo se não funcionários estiverem disponíveis.

Agora vamos executar outra experiência. Iniciar o aplicativo de ponto de usuário e inicie dois aplicativos de trabalho separados. Após alguns segundos, você deverá ver os trabalhos de duas aparecem na lista usuário aplicativo. Agora clique no botão Criar trabalho várias vezes. Se tudo funcionar conforme o esperado, você pode ver que o ponto de usuário deu itens de trabalho para os dois trabalhos. Ele deve tenha feito isso de maneira bastante rodízio. Os trabalhos continuará a processar seus itens de trabalho até que termine.

Spectators

Um requisito de muitos aplicativos é a necessidade de log e rastreamento. Para ajudar ainda mais a flexibilidade de um sistema baseado em P2P da demonstração, vou criar um novo tipo de ponto chamado agente de log. Meu ponto de agente de log receberão eventos relevantes para o sistema. Os eventos únicos que podem ser controlados no momento são aqueles que estão expostos pela interface IWorker: informações de status do trabalho e a atribuição de WorkItems para um trabalho específico. Esse nível de detalhe é insuficiente. Há muitos eventos adicionais que poderiam ser registrados e rastreados.

Embora eu pode acrescentar sobre a nova funcionalidade à interface IWorker, as mensagens enviadas para o controle são irrelevantes para aplicativos de trabalho e o usuário. Por isso, vou criar um novo contrato de serviço chamado ISpectator. (ESTOU usando o termo genérico Spectator em vez de agente de log ou a verificação para enfatizar duas coisas. Primeiro, esse tipo de ponto somente recebe comunicações e não deve gerar qualquer. Em segundo lugar, eu não sou ditando o que um ponto usando esse serviço faz com as informações recebidas; ele somente pode mostrar as informações em uma interface do usuário.) Aqui está a definição do contrato de serviço:

[ServiceContract]
public interface ISpectator {
  [OperationContract(IsOneWay=true)]
  void WorkItemStateChanged(WorkItemStatus status);
}

Como uma demonstração, eu estou declarando apenas um único método chamado WorkItemStateChanged. Um sistema completo provavelmente têm muitos eventos para notificar spectators do. O método de WorkItemStateChanged usa uma classe WorkItemStatus definida aqui:

[DataContract]
public class WorkItemStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public Guid WorkItemId;

  [DataMember]
  public WorkItemState NewState;
}

Esta classe permite que o remetente especificar o trabalho enviar a mensagem, o WorkItem essa mensagem é para e o novo estado do WorkItem. Estados válidos são enfileirados, Executing, concluído e Errored. Como eu tenho certeza de que você percebeu, eu não tenho um evento de WorkerStateChanged equivalente definido. Isso ocorre porque status de trabalho é enviado por meio da mensagem de pulsação e espero spectators para escutar esses. Com o WCF, isso é muito fácil de fazer. Minha classe de serviço simplesmente implementará interfaces IWorker e ISpectator.

A classe de serviço do agente de log pode ser vista na Figura 5 . Não há muita diferença estrutural entre essa classe e o trabalho. O método IWorker.Heartbeat, o agente de log tem o cache típico de dados de status do trabalho. Além disso, se este é o primeiro status para um trabalho ou se o estado é alterado para um trabalho, uma mensagem é gravada para a saída padrão. Da mesma forma, o método ISpectator.WorkItemStateChanged grava uma mensagem de saída padrão. Um produção sistema serão, provavelmente faça algo muito mais interessante, como gravação em um banco de dados log para relatórios personalizados.

Agente de log da Figura 5

[ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)]
public class Logger : IWorker, ISpectator {
  private ServiceHost _host;
  private Dictionary<string, WorkerStatus> _workers;

  public Logger() {
    _workers = new Dictionary<string, WorkerStatus>();
  }

  public void Start() {
    _host = new ServiceHost(this);
    _host.Open();
  }

  public void Stop() {
    _host.Close();
  }

  void IWorker.Heartbeat(WorkerStatus status) {
    bool changed = false;

    lock (_workers) { 
      if (_workers.ContainsKey(status.WorkerId)) {
        WorkerStatus s = _workers[status.WorkerId];

        if (s.State != status.State)
          changed = true;
      }
      else
        changed = true;

      _workers[status.WorkerId] = status;
    }

    if (changed)
      LogWorkerStatusChange(status);
  }

  void IWorker.DoWork(string workerId, WorkItem work) {
    //Ignored. This can be assumed by the
    //WorkItemState.Queued status event.
  }

  void ISpectator.WorkItemStateChanged(WorkItemStatus status) {
    Console.WriteLine(
      "Worker: {0}| WorkItem: {1}| State: {2}", 
      status.WorkerId, 
      status.WorkItemId, 
      status.NewState);
  }

  private void LogWorkerStatusChange(WorkerStatus status) {
    Console.WriteLine(
      "Worker {0}| State: {1}", 
      status.WorkerId, 
      status.State);
  }
}

O último item de ação na implementação do agente de log é definir configuração. a Figura 6 mostra a configuração para este serviço WCF. Diferenças perceptíveis são a falta de uma seção de cliente e a adição de um ponto de extremidade de serviço. Como um spectator logicamente é apenas um consumidor de informações, não há nenhum motivo para atuar como um cliente. Adição de um ponto de extremidade para o serviço permite usar a mesma instância de objeto de serviço para IWorker e ISpectator Coletores de mensagem.

Configuração da Figura 6 de agente de log

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="LoggerPeerApplication.Logger">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.ISpectator"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

  </system.serviceModel>
</configuration>

Para concluir a funcionalidade de ISpectator, preciso alterar o ponto de trabalho para fazer as chamadas apropriadas. Isso inclui uma nova empresa do cliente para o arquivo app.config:

<endpoint
  name="ISpectatorClient"
  address="net.p2p://MyPeerApplication"
  binding="netPeerTcpBinding"
  bindingConfiguration="unsecureBinding"
  contract="PeerWcfComm.ISpectator" />

Esse ponto de extremidade tem um novo nome (para que meu construtor ChannelFactory pode encontrar as configurações corretas) e o tipo de contrato de serviço novo ISpectator. <ispectator>Para que as alterações código, simplesmente preciso criar um novo ChannelFactory < ISpectator > e um novo canal cliente ISpectator. Em seguida, alterei o método DoWorkInternal na classe trabalho para a versão um pouco menos aesthetically agradável mostrada na Figura 7 . Basicamente, antes e depois do processamento de item de trabalho em suspensão o thread atual vou chamar o método de WorkItemStateChanged no canal ISpectator, fornecendo o estado apropriado.

Controle de status do trabalho a Figura 7

this._state = WorkerState.Busy;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() { 
    WorkerId = _id, 
    WorkItemId = wrk.WorkItemId, 
    NewState = WorkItemState.Executing 
  });

Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));

this._state = WorkerState.Ready;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() {
    WorkerId = _id,
    WorkItemId = wrk.WorkItemId,
    NewState = WorkItemState.Finished
  });

Compartilhamento de trabalho e de balanceamento de carga

É hora de tentar outra experiência. Novamente, crie dois trabalhos e um único usuário. Ter o usuário gere muita de itens de trabalho para os trabalhos. Agora, enquanto os trabalhos de duas estiver trabalhando fora, inicie uma outra instância de trabalho. De que isso, você verá um comportamento indesejado do meu sistema: um trabalho estiver ocioso, enquanto outros têm muito trabalho a fazer.

Em um aplicativo ponto a ponto, novos pontos podem vir online qualquer momento para participar da malha. Em um cenário de farm de servidor, QUERO que novos pontos para ingressar automaticamente na e imediatamente iniciar processamento a carga de trabalho coletiva.

A primeira alteração fazer é na implementação de método de IWorker.Heartbeat do trabalho. Atualmente, o código simplesmente produz estado de um trabalho para o console. Isso precisa ser substituído com o código que armazenará em cache estados dos colega trabalhos. Eu já fez isso nos pontos de usuário e Spectator e pode principalmente cortar e colar esse código na classe do trabalho.

Agora que o ponto de trabalho pode manter controle de outros trabalhos, POSSO adicionar código para verificar se há condições em que um trabalho deve compartilhar itens de trabalho (encaminhar) para trabalhos ociosos. Tenha alterado o final do método de MonitorWork do trabalho conforme mostrado na Figura 8 .

A Figura 8 atualizados MonitorWork

if (_state != WorkerState.Offline) {
  if (_workQueue.Count > 1) {
    WorkerStatus firstIdleWorker = GetNextIdleWorker();

    if (firstIdleWorker != null) {
      WorkItem workToForward;
      lock (_workQueue)
        workToForward = _workQueue.Dequeue();

       _workerClientChannel.DoWork(
        firstIdleWorker, 
        workToForward);
    }
  }

  ThreadPool.QueueUserWorkItem(MonitorWork);
}

Basicamente, após processar um item de trabalho e antes reprogramação propriamente dito, o método verifica para um nó de trabalho ocioso. (Adicionei o critério para fazer isso somente se eu tenho mais de um item na minha própria fila de trabalho. Isso ocorre porque não faz sentido para encaminhar meu único item de trabalho restantes para outro trabalho.) Se um trabalho ocioso for encontrado, ele dequeues um WorkItem e envia-lo para esse trabalho ocioso. O método GetNextIdleWorker contém a lógica de seleção:

private WorkerStatus GetNextIdleWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);
  return workers
    .Where<WorkerStatus>(
      w =>
        (w.State == WorkerState.Ready
        && w.PendingWorkItemCount == 0))
    .FirstOrDefault<WorkerStatus>();
}

A primeira coisa que faço é bloquear os dados _workers armazenar em cache e obter um instantâneo dos valores WorkerStatus nela. , Em seguida, procure por trabalhos no estado pronto com nenhum trabalho pendente, retornando aquela primeira encontrada.

Backup de WorkItem

Se um WorkItem é movido para um computador e na fila, o que será dele se esse ponto de trabalho for desligado acidentalmente? Uma falha de processo não permite que você salvar os dados após erro. Uma boa redução é fazer backup de cópias de WorkItems em pontos adjacentes. Uma solução de backup completa consiste em duas ações: backup e restauração. Isso não é uma tarefa trivial.

A solução de backup mais simples poderia ser reutilizar o método DoWork. O primeiro parâmetro especifica a identificação do trabalho que deve para fazer o trabalho. No entanto, essa mensagem, na verdade, vai check-out para a malha inteira. Eu poderia aproveitar esse fato e ter todos os pontos que não coincidirem com o operador de determinada identificação de salvar o WorkItem como um backup. O trabalho de destino real dessa mensagem, eventualmente, informa os outros para excluir esse backup.

A desvantagem aqui é que cada operador está usando recursos do cache de backups dos itens de trabalho. Se não existem muitos itens de trabalho e trabalhos podem processá-los muito rapidamente, isso pode ser uma solução viável. No entanto, se houver muitos itens de trabalho no sistema, pode obter muito caro.

Uma solução melhor seria minimizar o número total de backups, mas às custas de alguns extra malha mensagens.

Fazer uma página do artigo do Kevin Hoffman" Como estado de design compartilhamento em uma rede de mesmo nível", Pode fazer backup WorkItems em pontos vizinhos, enviando uma mensagem limitada a uma contagem de salto único. A solicitação de backup e comunicações subseqüentes podem ser como chatty conforme desejado. Como essas mensagens de backup são sem sentido para usuários ou Spectators, não uso o contrato de serviço IWorker. Criei um novo IWorkBackup chamado um:

[ServiceContract]
public interface IWorkBackup {
  [OperationContract(IsOneWay = true)]
  void BackupItem(WorkItemBackupRequest request);

  [OperationContract(IsOneWay = true)]
  void DeleteBackup(Guid workItemId);

  [OperationContract(IsOneWay = true)]
  void RestoringBackup(Guid workerId, Guid workItemId);
}

O primeiro método é chamado por um trabalho que fazer backup de um WorkItem. A segunda é chamada quando um WorkItem foi concluída e outros trabalhos podem excluir suas cópias dele. O último é chamado quando um trabalho determina se um WorkItem de backup devem ser restaurada e que pretende fazer isso.

A mensagem enviada por BackupItem é definida pela classe WorkItemBackupRequest:

[MessageContract]
public class WorkItemBackupRequest {
  [PeerHopCount]
  public int HopCount = 1;

  [MessageBodyMember]
  public WorkItem Item;

  [MessageBodyMember]
  public Guid OriginalWorkerId;
  public DateTime BackupRequestTimeStamp;
  public DateTime RestoreAt;
}

O membro HopCount tem o atributo PeerHopCount, que limita a passagem de mensagem por meio da malha para o número especificado de saltos. Eu já codificado que o valor 1 para que essa mensagem será ser enviada apenas para adjacente (um salto) pontos. Há também dois membros marcados com o atributo MessageBodyMember: item (o item de trabalho será feito backup) e OriginalWorkerId (o operador solicitando o backup). Elas serão enviadas na mensagem SOAP subjacente. Também adicionei dois membros adicionais de controle interno.

Quando um trabalho recebe a mensagem RequestBackup, ele precisa armazenar em cache desse item. Isso é feito facilmente o suficiente, adicionando um novo campo privado para armazenar os backups:

private Dictionary<string, WorkItem> _workItemBackups;

É aqui a implementação do método RequestBackup para um ponto de trabalho:

void IWorkBackup.BackupItem(WorkItemBackupRequest request) {
  request.BackupRequestTimeStamp = DateTime.Now;
  request.RestoreAt = DateTime.MaxValue;

  lock (_workItemBackups)
    _workItemBackups.Add(request.Item.WorkItemId, request);
}

Esse método define a BackupRequestTimeStamp para a hora atual e RestoreAt assume o padrão para DateTime.MaxValue. Esses campos provar ao ser útil mais tarde.

Quando trabalho tiver sido concluído no ponto de trabalho local, ele precisa enviar uma mensagem para todos os pontos que tem potencialmente armazenada em cache um backup do item de trabalho. Ele faz isso chamando o método DeleteBackup e fornecer a identificação do WorkItem que foi concluída.

Observe que eu não limitar esta mensagem para pontos adjacentes como fiz com a mensagem BackupItem. O motivo é que a topologia de malha de mesmo nível pode alterar. Em entre o momento que um backup foi solicitado e o WorkItem é concluída, colegas de trabalho que uma vez estavam adjacentes podem não ser agora. Assim, eu enviar uma mensagem geral para todos os trabalhos para excluir seu backup deste item, se eles tiverem um. Quanto à completude, eis o código em que o ponto de trabalho que implementa DeleteBackup:

void IWorkBackup.DeleteBackup(Guid workItemId) {
  lock (_workItemBackups)
    if (_workItemBackups.ContainsKey(workItemId))
      _workItemBackups.Remove(workItemId);
}

Fazendo backup e remover itens de trabalho são moleza! No entanto, a dificuldade de true surge determinar quando um trabalho de backup item precisa ser restaurado. Preciso determinar quando um item de trabalho precisa ser restaurado e que será restaurá-lo.

Para determinar quando um item de trabalho deve ser restaurado, cada operador deve monitorar periodicamente seu cache de backup. Para isso, adicionei outro System.Threading.Timer chamado _backupMonitor, que é definido no método iniciar para o trabalho:

_backupMonitor = new Timer(
  this.MonitorBackups,
  null,
  new TimeSpan(0),
  new TimeSpan(0, 1, 0));

Isso pode executar em um período muito mais lento que a pulsação, portanto, que definir o timer para acionar a cada minuto.

O método MonitorBackups manipula a lógica para restauração de backups e pode ser visto na Figura 9 . Esse método será iterar através do item de backup e verificar cada uma para a necessidade de ser restaurado. Para fazer isso, em algumas suposições precisará ser feitas. Primeiro, eu tenho uma variável local denominada timeOutThreshold. Isso é a quantidade de tempo de quando eu foi informado fazer backup de um item para quando eu deve esperava para serem processados. Eu codificados para um minuto (embora o ideal é que deve ser configurável) porque que sabe que meus itens de trabalho podem durar não há mais de 15 segundos (isso foi definido por mim no meu aplicativo de usuário). Portanto, exceto no uso de pico, deve ser bastante tempo. (Talvez uma solução melhor para a origem da solicitação de backup determinar esse tempo limite e especificá-lo na mensagem de solicitação. Ele tem conhecimento de sua carga de trabalho e pode dar uma estimativa muito melhor.) Se a hora atual for maior que o WorkItemBackupRequest.BackupRequestTimeStamp mais o timeOutThreshold, vou para investigar ainda mais. EU não considero automaticamente este item de trabalho falharam porque o trabalho original simplesmente pode estar muito ocupado. No entanto, se o tempo expirou, eu passar para verificar o status do trabalho original. Se estiver ocupado ou se com os itens na sua fila de trabalho considerarei é simplesmente ocupado e ainda receberão a este item de trabalho.

A Figura 9 MonitorBackups

private void MonitorBackups(object context) {
  List<WorkItemBackupRequest> backups = 
    new List<WorkItemBackupRequest>();
  TimeSpan timeOutThreshold = 
    new TimeSpan(0, 1, 0); //This should be read in from config.

  lock (_workItemBackups) {
    backups.AddRange(_workItemBackups.Values);

    foreach (WorkItemBackupRequest backup in backups) {
      //If we've passed the RestoreAt time, we need to
      //take on this work item.
      if (backup.RestoreAt < DateTime.Now) {
        lock (_workQueue)
          _workQueue.Enqueue(backup.Item);

        _workItemBackups.Remove(backup.Item.WorkItemId);
      }

      //This work item has not been deleted yet. 
      //This is suspicious so investigate further.
      else if (backup.BackupRequestTimeStamp.Add(timeOutThreshold) >
         DateTime.Now) {

        //Check to see if the original worker is still operating.
        //Regardless, we won't do anything and the backup
        //remains in our cache.
        if (!WorkerIsStillActive(backup.OriginalWorkerId)) {
          //The machine doesn't seem to be actively working.
          //Set a deadline for restoration of the work item.
          backup.RestoreAt = DateTime.Now.Add(timeOutThreshold);

          //Tell the world our intentions to restore this.
          _backupClientChannel.RestoringBackup(
            this._id, backup.Item.WorkItemId);

          Console.WriteLine("Planning to restore '{0}'", 
            backup.Item.WorkItemId);
        }
      }
    }
  }
}

private bool WorkerIsStillActive(Guid workerId) {
  bool active = false;

  lock (_workers) {
    if (_workers.ContainsKey(workerId)) {
      WorkerStatus sts = _workers[workerId];
      if (sts.State == WorkerState.Busy)
        active = true;
      else if (sts.State == WorkerState.Ready && 
        sts.PendingWorkItemCount > 0)
        active = true;
    }
  }

  return active;
}

Se eu receber este ponto e ver que o meu limite assumido expirou e que o trabalho original está responder, começarei o processo de restauração desse item de trabalho. Isso é feito chamando o método IWorkBackup.RestoringBackup para anunciar minha intenção a todos os outros pontos. Também definir o campo de RestoreAt do objeto WorkItemBackupRequest para um período futuro. Embora eu estou reutilizando o mesmo valor TimeSpan, esse tempo futuro é logicamente diferente (e o ideal é que deve ter um ponto de configuração exclusivas). RestoreAt é o tempo em que que irá supor que todos os pontos receberam minha mensagem e tiverem tido tempo para responder, se desejar. Saber mais sobre essa lógica posteriormente. Mas por ora, sabe que defini RestoreAt como o tempo quando na verdade, moverei o WorkItem do meu backup cache para meu fila de trabalho. Na próxima vez o timer MonitorBackups é acionado ele verificará esse valor e funcionar corretamente.

Essa lógica determina quando ocorre uma restauração. Ele é a parte mais fácil para determinar. O trickiest é que deve restaurar um item de trabalho. PeerChannel tenta manter uma malha ideal e a topologia pode alterar como pontos ingressar e deixar. Assim, um único ponto pode ter de dois a sete nós adjacentes a qualquer momento. Se uma solicitação de backup um salto foi enviada a todos eles, eu teria vários trabalhos todos os vying para restaurar o mesmo trabalho, pois todos eles têm a mesma lógica determinar quando um WorkItem deve ser restaurado. Portanto, o que acontece quando eles todas simultaneamente tente e restaurar? Para restaurar, os pontos primeiro devem anunciar sua intenção, chamando o método IWorkBackup.RestoringBackup (veja a Figura 10 ).

A Figura 10 IWorkBackup.RestoringBackup

void IWorkBackup.RestoringBackup(Guid workerId, Guid workItemId) {
  lock (_workItemBackups) {
    //Only proceed if we have this item backed up.
    if (_workItemBackups.ContainsKey(workItemId)) {
      //Get the backup request for this work item
      WorkItemBackupRequest req = _workItemBackups[workItemId];

      //Check to see if we've decided to restore this.
      if (req.RestoreAt != DateTime.MaxValue) {
        //The restorer with the greatest id wins!
        if (this._id.CompareTo(workItemId) < 0)
          _workItemBackups.Remove(workItemId);
      }
      else {
        //We haven't decided to restore it and 
        //someone else wants to. So let them.
        _workItemBackups.Remove(workItemId);
      }
    }
  }
}

Neste ponto, há três cenários como visto de ponto de vista de um trabalho que apenas recebeu uma mensagem RestoringBackup do outro trabalho:

  1. O trabalho está restaurando um item que não tiver armazenado.
  2. O trabalho está restaurando um item que tiver no cache, mas ainda não pretende restaurar.
  3. O trabalho está restaurando um item que estou também com a intenção para restaurar.

Cenário um é simples, como nenhuma ação é necessária. Cenário dois também é simples em que eu simplesmente possa remover o item de backup do nosso cache. Ainda ainda não determinou que ela necessária a restauração, mas alguém tem podem tê-lo.

Ele é o cenário três que requer a quebra de empate. Cada ponto está atuando de forma autônoma e simultaneamente pode anunciar que pretende restaurar um item. As mensagens podem passar uns aos outros no meio físico e, portanto, mais de um operador foi possível localizar propriamente dito no cenário três.

A mágica pergunta é: quem vence? Que restaura e que ignora? A decisão deve ser feita em algo que não é relativa entre pontos (como hora do sistema) e não muda. Decidi que um empate vai para a identificação do mais alto valor trabalho. Uma chamada impartial para Guid.CompareTo determina a vencedora. Com isso no lugar, a vencedora faz a restauração e ignora o perdidos.

Interessantes de mesmo nível

Eu Admita. . . Rede ponto a ponto é divertido. Mas isso não invalidar o fato de que há cenários do mundo real em que sua natureza descentralizada torna um ajuste adequado para aplicativos. Além disso, esses aplicativos não está limitados a bate-papo ou mídia pirateando! Aplicativos de habilitar ponto que requerem o processamento em massa é um ajuste natural, se um farm de aplicativo personalizado ou um cenário de computação de alto desempenho (HPC). Faz a coordenação de trabalho e o processamento em uma escala grande relativamente fácil.

O design do WCF torna simples para implementar um serviço e evolui-lo conforme necessário. Os dados compartilhados e a comunicação de uma malha de ponto a ponto permite que diferentes clientes observar e participar do que está acontecendo. Este artigo fornece uma demonstração de criação de uma plataforma de processamento habilitado ponto onde vários jogadores (de trabalhos, usuários e Spectators) podem funcionam juntas para uma finalidade em comum: Obtendo seu trabalho.

Matt Neely é engenheiro de design de software da Microsoft da equipe de ferramentas de responsável pelo treinamento do usuário do SQL Server. Ele gasta a maior parte seu tempo convencendo seu amigo Caleb ele irá ser mencionado em um artigo futuro e shaving seu cachorro (um goldendoodle) por si mesmo com resultados astonishingly horrific. Ele pode ser contatado em mneely@Microsoft.com.