Il presente articolo è stato tradotto automaticamente.

Peer-to-peer divertente

Un'elaborazione di lavoro peer-to-peer applicazione con WCF

Matt Neely

Download codice disponibile dalla Raccolta di codice di MSDN
Selezionare il codice in linea

Nell'articolo viene descritto:

  • Base comunicazione peer-to-peer
  • Creare il peer di lavoro
  • Bilanciamento del carico e condivisione di lavoro
  • Backup e ripristino di elementi di lavoro
In questo articolo vengono utilizzate le seguenti tecnologie:
WCF

Contenuto

Base comunicazioni peer
Il peer di lavoro
Effettua l'attività di lavoro
Il peer utente
Spectators
Condivisione del lavoro e bilanciamento del carico
Backup WorkItem
Fun peer

il gruppo sono parte a Microsoft consente di richiedere una settimana disattivare dal nostro compiti per cercare un argomento regolare.Recente deciso di dedicare la settimana ricerca peer-to-peer (P2P) rete.Tale ricerca ballooned in un interesse permanente per questo nuovo paradigma di rete.

Uno dei motivi possiede P2P piqued che mio interesse era a causa di un sistema di generazione di documentazione interna il team.È necessario che gli autori di scrivere migliaia di documenti della Guida in linea che devono essere analizzati, trasformato, corretti e compilato.I documenti possono essere compilati in vari formati di output e in varie lingue ogni notte.Per eseguire questo tra, abbiamo una server farm che eseguono il software di generazione.

Il meccanismo che attualmente utilizzato è l'architettura client-server tipica.I processi di generazione sono in coda su un server centrale e doled uscita come creare il server che si liberi per l'elaborazione.È talvolta verificarsi un backlog di processi nel server centrale o in un misbehaving server di generazione.Molti membri del team presenti un interesse vested lo stato del sistema di generazione, ma il sistema è opaco e difficili da monitorare.

In questo articolo è il risultato della definizione degli obiettivi di una versione P2P di questo sistema di generazione.Verrà discusso di un lavoro generico elaborazione un'applicazione attivata peer basato il PeerChannel Windows Communication Foundation (WCF) e struttura emergente.È possibile facilitare la costruzione del sistema durante la discussione parte le decisioni di progettazione devono essere apportate nel processo.

Base comunicazioni peer

Per questo progetto, inizierà definendo il peer di elaborazione di lavoro principale.È possibile verrà assegnare loro il nome generico del processi di lavoro.Il piano sarà dispongono di un lavoro per computer fisici (ma non esistono limitazioni tecniche che richiedono questo).

Ogni lavoro dovrà comunicare e coordinare con altri utenti a scopo di lavoro.A tale scopo, è necessario conoscenza su altri processi di lavoro.Così come il lavoro verrà ottenere queste informazioni?

Un approccio consiste ogni lavoro polling ogni lavoro per stato.Questo meccanismo è complesso come richiede due fasi: individuazione di lavoro e lavoro durante la ricerca.Inoltre, se ogni lavoro dovesse ping di ogni altro lavoro, sarebbe stato molti messaggi non necessari viaggio della rete.Insieme al fatto che il PeerChannel WCF è implicitamente multicast, ping un unico lavoro verrà effettivamente inviare un messaggio ogni lavoro.Per n colleghi di lavoro, si farebbe dispone di messaggi n3 volanti intorno.È stato rifiutato questo approccio per un modello di trasmissione.

In un modello trasmissione ogni lavoro invierà periodicamente relativo stato.VIENE chiamato questo un messaggio di heartbeat.Altri processi di lavoro possibile decidere che cosa fare con queste informazioni, anche se sono solo ignorarla.Preferisco questo approccio perché come cambia la topologia del reticolo di lavoro, è possibile dedurre nodi provenienti in linea e dalla presenza o assenza di heartbeat messaggi non in linea.Individuazione del peer di lavoro è passivo anziché attivo.Questo modello ridotta a icona anche il traffico di rete necessario solo per upkeep di sistema.Con n colleghi di lavoro sarebbe ho n2 messaggi in questo modello, rappresenta un miglioramento rispetto al primo approccio.

Per creare un servizio in WCF, è in genere partire con definisce il contratto di assistenza.Questa è l'interfaccia che definisce la funzionalità di che un servizio espone ai client.In uno scenario P2P, ogni peer funge da server e client e il contratto di assistenza creato qui verrà definito come peer comunicano tra loro.Data la decisione di trasmissione dello stato di lavoro, È possibile definire il contratto di assistenza:

[ServiceContract]
public interface IWorkerPeer {
  [OperationContract(IsOneWay=true)]
  void Heartbeat(WorkerStatus status);
}

Il contratto di assistenza IWorkerPeer ha un'operazione unidirezionale, singola denominata heartbeat. Quando un lavoro chiama questo metodo, invierà fuori istanza dell'oggetto WorkerStatus predefinita ogni altri peer lavoro partecipano il reticolo.

Ecco la definizione della classe WorkerStatus semplice:

[DataContract]
public class WorkerStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public WorkerState State;
}

Esso conterrà un identificatore univoco per il lavoro e un valore di enumerazione dello stato semplice. In un sistema più funzionale, potrebbe trattarsi un elaborate set di dati che includa descrittori aggiuntivi, i contatori delle prestazioni statistiche di lavoro (ad esempio la percentuale di elementi di lavoro non riuscita) e più. Come definito, tuttavia, questa operazione minime consentirà ogni lavoro conoscere lo stato di ogni lavoro del reticolo.

Gli stati che sono stati definiti sono sconosciuto, pronto, occupato e non in linea. Uno stato di sconosciuto indica che non sono che dello stato del lavoro (ad esempio per la mancanza di un heartbeat da tale lavoro entro un periodo di tempo specificato). Uno stato di preparazione significa che il lavoro funzioni correttamente ed è pronto a assumono un lavoro. Un impostato lo stato indica che il lavoro sta funzionando correttamente, ma occupato questo lavoro. Uno stato non in linea indica che il lavoro non è disponibile per il lavoro. È potenzialmente possibile stato inviato da finale heartbeat un lavoro o forse un lavoro che viene eseguita in modalità non in linea da un amministratore per manutenzione o il debug.

Il peer di lavoro

Per rendere un'applicazione di peer lavoro funzionale, È necessario esporre e utilizzare il servizio. Pertanto È creata un'applicazione console semplice e assegnato una classe denominata lavoro come illustrato nella Figura 1 . Nella mia classe di lavoro, è stato aggiunto Start e Stop metodi per gestire il setup e dell'eliminazione del servizio esposto (che riceve i messaggi), nonché il canale di client (per l'invio di messaggi). È possibile inoltre utilizzare un'istanza System.Threading.Timer per chiamare periodicamente il metodo SendHeartbeat. SendHeartbeat crea un'istanza di un oggetto WorkerStatus si compila con i dati appropriati e quindi inviata a tutti i colleghi. Come servizio, quando ottiene chiamato il metodo IWorker.Heartbeat, esso verrà semplicemente output il contenuto del messaggio all'output standard.

Figura 1 la classe di lavoro

namespace WorkerPeerApplication {
  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]

  public class Worker : IWorker {
    private ServiceHost _host;
    private ChannelFactory<IWorker> _factory;
    private IWorker _workerClientChannel;

    private Guid _id;
    private volatile WorkerState _state;

    private Timer _heartbeat;

    public Worker() {
      _id = Guid.NewGuid();
      _state = WorkerState.Offline;
    }

    public void Start()  {
      _host = new ServiceHost(this);
      _host.Open();

      _factory = new ChannelFactory<IWorker>("IWorkerClient");
      _workerClientChannel = _factory.CreateChannel();

      _state = WorkerState.Ready;
      _heartbeat = new Timer(
        this.SendHeartbeat,
        null,
        new TimeSpan(0),
        //Time should really be read in from config.
        new TimeSpan(0, 0, 1)); 
    }

    public void Stop() {
      _state = WorkerState.Offline;
      _heartbeat.Dispose();
      this.SendHeartbeat(null);
      _factory.Close();
      _host.Close();
    }

    private void SendHeartbeat(object context) {
      WorkerStatus ws = new WorkerStatus();
      ws.WorkerId = _id;
      ws.State = _state;

      _workerClientChannel.Heartbeat(ws);
    }

    void IWorker.Heartbeat(WorkerStatus status) {
      Console.WriteLine(
        "{0}: {1}", 
        status.WorkerId, 
        status.State);
    }
  }
}

Per il completamento, È necessario un metodo Main e la configurazione di WCF appropriata, è possibile inserire nel file di configurazione dell'applicazione. Il metodo Main è visibile nel download del codice e la configurazione di che utilizzo viene visualizzata nella Figura 2 .

Configurazione di lavoro nella Figura 2

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="WorkerPeerApplication.Worker">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint           
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

    <client>
      <endpoint
        name="IWorkerClient"
        address="net.p2p://MyPeerApplication"
        binding="netPeerTcpBinding"
        bindingConfiguration="unsecureBinding"
        contract="PeerWcfComm.IWorker" />
    </client>

  </system.serviceModel>
</configuration>

Si noterà che è possibile definire sia il servizio che espone il peer e configurazione lato client del peer. La configurazione del servizio specifica il tipo del servizio, nonché relativo indirizzo di base e l'endpoint. SI utilizza un'associazione non protetta e il peer nomi protocollo PNRP (Resolution) per la risoluzione dei nomi di peer. La configurazione del client è praticamente un mirror del servizio. La differenza solo evidente è che È assegnato l'endpoint client un nome che il costruttore di ChannelFactory di accedere facilmente la configurazione desiderata.

Ora che dispone di shell di un peer di lavoro, si unità test. Per un esperimento esecuzione due istanze di questo programma nello stesso momento. Si noterà che a inizialmente ogni peer verrà output le proprie informazioni di heartbeat. Dopo alcuni secondi i peer inizierà trovare tra loro e ogni lavoro riceveranno lo stato dagli altri.

Frankly, è possibile considerare annoying e modificare per i messaggi personali essere inviati nuovamente a me. Per fortuna, il Blog del team PeerChannelFornisce il codice che È necessario. Si noterà una classe molto utile denominata RemoteOnlyPropagationFilter. Prende in prestito questo, POSSO quindi aggiungere due righe di codice al metodo Worker.Start per garantire che i messaggi che è possibile inviare non inoltre costretti inviati a me:

PeerNode pn = ((IClientChannel)_clientChannel).GetProperty<PeerNode>();
pn.MessagePropagationFilter = new RemoteOnlyMessagePropagationFilter();

Queste due righe di codice sono state aggiunte prima di è possibile avviare il timer di heartbeat.

Effettua l'attività di lavoro

Finora, ho un'applicazione che comunica semplicemente fra i peer di lavoro. Worker non effettivamente eseguire ancora alcuna operazione. Sarà risolvere questa per aumentare l'interfaccia IWorker, aggiunta il metodo riportato di seguito:

[OperationContract(IsOneWay = true)]
void DoWork(Guid workerId, WorkItem work);

Questo metodo consente di un'entità esterna indicare un peer di lavoro a scopo di un elemento di lavoro. Dopo il WCF messaggistica viene multicast, aggiunto un parametro per specificare l'ID di lavoro che deve eseguire le operazioni. La classe WorkItem verrà illustrato più avanti. Ma per ora, la modifica di interfaccia richiede di Aggiorna la classe di lavoro, come illustrato nella Figura 3 . Quando un utente indica un lavoro alcune operazioni, la richiesta viene aggiunto a una coda interna e infine eseguita in ordine ricevute. Per eseguire effettivamente il lavoro, è stato aggiunto un metodo denominato MonitorWork alla classe di lavoro. Il metodo Start chiamerà semplicemente ThreadPool.QueueUserWorkItem, scegliere il metodo MonitorWork.

Nella figura 3 revisione lavoro

public class Worker : IWorker {
  // Omitted for clarity ...
  private Queue<WorkItem> _workQueue;

  public Worker() {
    // Omitted for clarity ...
    _workQueue = new Queue<WorkItem>();
  }

  public void Start() {
    // Omitted for clarity ...
    ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void MonitorWork(object context) {
    if (_workQueue.Count > 0) {
      lock (_workQueue)
        _currentWorkItem = _workQueue.Dequeue();

      DoWorkInternal(_currentWorkItem);
      _currentWorkItem = null;
    }

    if (_state != WorkerState.Offline)
      ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void DoWorkInternal(WorkItem wrk) { ...}

  void IWorker.DoWork(Guid workerId, WorkItem work) {
    if (workerId == _id)
      lock (_workQueue)
        _workQueue.Enqueue(work);
  }
}

A un alto livello, MonitorWork verrà eseguire un'operazione e riprogrammare stesso tramite un'altra chiamata a ThreadPool.QueueUserWorkItem. Questa operazione verrà eseguita fino a ottenere lo stato del lavoro non in linea si verifica quando viene chiamato Stop sull'oggetto lavoro. Un che caso consiste nel controllare se la coda di lavoro contiene nulla. Se la coda contiene gli elementi di lavoro, MonitorWork verrà annullamento dell'accodamento WorkItem successivo e iniziare l'elaborazione. Il comportamento desiderato è per il lavoro elaborare in modo seriale il lavoro, anche se Impossibile essere modificato facilmente per elaborare gli elementi in parallelo.

Come si notare dal codice nella Figura 3 , dispone non risolto il lavoro è ancora. È stato semplicemente passato questo al ancora -a-da-definito DoWorkInternal metodo. Pertanto, prima esaminiamo la definizione della classe WorkItem. Il sistema che attiva creazione per questo articolo è agnostico attività, verrà semplicemente definire mio elemento di lavoro nel modo seguente:

[DataContract]
public class WorkItem {
  [DataMember]
  public Guid WorkItemId { get; set; }

  [DataMember]
  public int PretendToWorkSeconds { get; set; }
}

Più necessario utilizzare un attributo DataContract o MessageContract in modo che WCF correttamente può serializzare la classe personalizzata. La classe WorkItem dispone di un campo dell'identificatore, WorkItemId e un campo che specifica il numero di secondi di feign questo lavoro effettivo (denominato PretendToWorkSeconds).

Ora che è definito un WorkItem, È possibile elaborare. Di seguito è mia implementazione di metodo DoWorkInternal:

private void DoWorkInternal(WorkItem wrk) {
  this._state = WorkerState.Busy;
  Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));
  this._state = WorkerState.Ready;
}

Si tratta in cui in genere farebbe mio lavoro significativo (ad esempio, creare la documentazione del programma di installazione di SQL Server 2008 in portoghese brasiliano). Ma per ragioni di questo articolo appena verrà alla modalità sospensione il periodo di tempo specificato.

Infine, è possibile aggiornare la classe WorkStatus con due membri dei dati nuovi che verranno impostati prima che venga inviato heartbeat un lavoro. Sono l'identificatore dell'elemento di lavoro attualmente in esecuzione e un conteggio degli elementi di lavoro in sospeso su quel peer di lavoro:

[DataContract]
public class WorkerStatus {
  // Omitted for clarity...

  [DataMember]
  public Guid CurrentWorkItem;

  [DataMember]
  public int PendingWorkItemCount;
}

Ora che i colleghi di lavoro dispone di alcuni dati, devo acutely conoscenza di scenari in cui riportare perdita di dati. Uno dei compromessi di un paradigma di rete decentralizzato come P2P è che non vi è alcun archivio centrale per i dati. Ogni peer è responsabile della collettivamente conservando i dati del sistema. Pertanto, cosa succede se un peer lavoro interrotta? In particolare per il sistema, che cosa accade a elementi di lavoro in coda?

È possibile raggruppare gli scenari di arresto di peer in due categorie: previsti e imprevisti. Per arresti una natura previsto, È necessario affrontare elementi di lavoro in coda. Il paradigma per la gestione di questi elementi può variano a seconda della natura dell'applicazione e dati. Se i dati non sono importanti, È possibile semplicemente eliminare tutti elementi di lavoro in coda. Se l'applicazione è progettata per essere sospeso (o per riavviare automaticamente come un servizio di Windows), ad esempio gli elementi da un disco di serializzazione e la lettura all'avvio è la soluzione corretta. Ancora un'altra opzione consiste nell'semplicemente inoltrare elementi di lavoro surplus per altri processi di lavoro prima di arresto. Tratterò arresto imprevisto più avanti in questo articolo nel contesto di backup dei dati.

Il peer utente

Finora, è possibile avere un'applicazione di attivata peer che può accettare ed elaborare gli elementi di lavoro definito. Ma chi crea questi elementi di lavoro e li invia i peer di lavoro? Uno delle potenti funzionalità di un sistema P2P è che chiunque può hook stessa infrastruttura di comunicazione. È necessario per partecipare a (per non protetta mesh) è conoscenza dell'indirizzo del reticolo e il contratto di assistenza. Illustrerò questo aggiungendo un nuovo tipo di peer denominato peer di un utente.

La classe per il peer utente è simile al peer lavoro discusso in precedenza. Il peer utente verrà inoltre implementare l'interfaccia di IWorker, ma non necessario fornire un'implementazione del metodo DoWork. (Ricordare pensare di interfacce ServiceContract come un contratto di comunicazione e non come una dichiarazione di identità. Un utente non è un lavoro, ma fa parte del reticolo di peer IWorker e pertanto implementa tale interfaccia.)

Il peer utente è interessato verifica dei processi di lavoro, nonché creazione di nuovi elementi di lavoro e consentendo di processi di lavoro. Nella figura 4 è illustrata una schermata dell'applicazione utente. Questa applicazione sono elencate le informazioni ricevuto da messaggi di stato del lavoro. Tuttavia, la funzionalità che si desidera evidenziare è la creazione di elementi di lavoro e quindi assegnarli ai processi di lavoro.

fig01.gif

Nella figura 4 l'applicazione di peer utente

Una decisione deve avvenire relative alle modalità di assegnazione elementi di lavoro a processi di lavoro. Un metodo, è possibile trasmettere un messaggio che informa che non è disponibile un nuovo WorkItem tutti i processi di lavoro e si basano sulla natura multicast di PeerChannel. Il lavoro prima di rispondere che sia pronto e in grado di richiedere prevale il diritto di avere WorkItem nuovo.

Questo approccio non mi piace per diversi motivi. In primo luogo, il protocollo di comunicazione sarebbe piuttosto chatty. L'utente si invia un messaggio "che desidera che un elemento di lavoro". Amenable processi di lavoro potrebbe rispondere con un messaggio " verrà prendere È". Quindi sarebbe stato completato un messaggio "attività lavoro" lavoro vincente l'utente. Inoltre (a causa della asincrona natura del peer di interazione più), attenzione necessario da assegnare tutte le potenziali race condition che potrebbe verificarsi.

In secondo luogo, sarebbe necessario definire quali significa "pronto e in grado di" per un lavoro. Se tutti i processi di lavoro sono occupati, non sono rispondono? Un utente scopo se non viene ricevuta nessuna risposta

Invece di questo metodo annuncio di lavoro, sono scelta riutilizzare le informazioni già fornite da heartbeat di lavoro. Queste informazioni consente mi conoscenza sufficiente per calcolare quale lavoro deve essere assegnato un WorkItem appena creato. Di conseguenza, l'algoritmo per la scelta lavoro vincente non risiedono nel codice di lavoro e può essere modificato in modo indipendente dal lavoro. Per motivi di semplicità, verrà per scegliere il lavoro per inviare che un WorkItem base i membri WorkerState, CurrentWorkItem e PendingWorkItemCount degli oggetti WorkerStatus. Ecco il codice dietro il pulsante Crea attività:

private void btnNewWork_Click(object sender, EventArgs e) {
  WorkItem wrkItem = new WorkItem();
  wrkItem.WorkItemId = Guid.NewGuid();
  wrkItem.PretendToWorkSeconds = _rnd.Next(15);

  string nextWorker = GetNextWorker();
  _clientChannel.DoWork(nextWorker, wrkItem);

  lock (_workers)
    _workers[nextWorker].PendingWorkItemCount++;
}

Viene generato un WorkItem nuovo, in modo univoco identificato con un tempo selezionato casualmente compreso tra 0 e 15 secondi (_rnd è un campo a livello di classe di tipo casuale). È possibile decidere il successivo processo di lavoro per inviare il lavoro, quindi inviare il lavoro. È possibile incrementare manualmente la versione memorizzata nella cache di PendingWorkItemCount il lavoro. In questo modo evitare situazioni in cui due o più processi di lavoro dispone statistiche stessi. Caso, è possibile generare lavoro più rapidamente rispetto a processi di lavoro invia heartbeat di stato. Pertanto verrà regolare gli statistiche di lavoro memorizzata nella cache e inviare non unfairly molti elementi di lavoro di lavoro stesso.

La decisione per la selezione di lavoro successivo per la ricezione di lavoro è lo scopo del metodo GetNextWorker illustrato di seguito:

private WorkerStatus GetNextWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);

  return workers
    .Where<WorkerStatus>(
      w => w.State != WorkerState.Offline
      && w.State != WorkerState.Unknown)
    .OrderBy<WorkerStatus, int>(w => w.PendingWorkItemCount)
    .FirstOrDefault<WorkerStatus>();
}

Grazie a metodi di estensione nuovo nelle librerie di Microsoft .NET Framework 3.5, eliminazione di processi di lavoro indica lo stato non in linea e sconosciuto e l'ordinamento per i conteggi di elemento di lavoro in sospeso è semplice. Effettuare una singola (sebbene wordy) istruzione entrambe le operazioni. Inoltre possibile restituire il primo elemento nell'enumerazione. L'utilizzo del metodo FirstOrDefault assicura che la chiamata ha esito positivo anche se non dipendenti sono disponibili.

Ora si eseguire esperimento un altro. Avviare l'applicazione di peer utente e quindi avviare due applicazioni di lavoro separate. Dopo alcuni secondi, verrà visualizzato i due processi di lavoro visualizzato nell'elenco dell'applicazione utente. Fare clic sul lavoro Crea pulsante più volte. Se tutto funziona come previsto, consente di verificare che il peer utente assegnato gli elementi di lavoro per processi di lavoro di entrambi. È necessario stato fatto in modo abbastanza predefinito round robin. I processi di lavoro continuerà l'elaborazione i relativi elementi di lavoro fino al completamento.

Spectators

Un requisito di molte applicazioni è la necessità di registrazione e analisi. Per facilitare ulteriormente la flessibilità di un sistema di base P2P showcase, verrà per creare un nuovo tipo di peer denominato logger. Il peer logger riceveranno eventi pertinenti al sistema. Gli eventi solo che potrebbero essere attualmente rilevati sono quelli che sono esposti dall'interfaccia IWorker: le informazioni sullo stato del lavoro e l'assegnazione di elementi di lavoro per un determinato lavoro. Questo livello di dettaglio è insufficiente. Esistono numerosi eventi aggiuntivi che possono essere registrati e tiene traccia.

Sebbene È possibile aggiungere nuove funzionalità all'interfaccia IWorker, i messaggi inviati per la verifica sono irrilevante per le applicazioni lavoro e utente. Di conseguenza, verrà per creare un nuovo contratto di assistenza denominato ISpectator. (STO utilizzando il termine generico spettatore invece di logger o reporter per evidenziare due operazioni. Innanzitutto, questo tipo di peer solo riceve comunicazioni e non dovrebbe generare uno. In secondo luogo, è possibile non mi dettatura cosa un peer utilizzando questo servizio con le informazioni ricevute; potrebbe solo visibile le informazioni in un'interfaccia utente.) Ecco la definizione del contratto di servizio:

[ServiceContract]
public interface ISpectator {
  [OperationContract(IsOneWay=true)]
  void WorkItemStateChanged(WorkItemStatus status);
}

Come una dimostrazione, è dichiarazione solo un singolo metodo denominato WorkItemStateChanged. Un sistema completo avrà probabilmente molti eventi per notificare spectators di. Il metodo WorkItemStateChanged accetta una classe WorkItemStatus definita di seguito:

[DataContract]
public class WorkItemStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public Guid WorkItemId;

  [DataMember]
  public WorkItemState NewState;
}

Questa classe consente al mittente specificare lavoro invio il messaggio, WorkItem questo messaggio è destinato e il nuovo stato del WorkItem. Gli stati validi sono in coda, Executing, completato e Errored. Come sono che è possibile notare, è possibile non dispongono di un equivalente evento WorkerStateChanged definito. Ciò avviene perché lo stato del lavoro viene inviato tramite il messaggio di heartbeat e previsto spectators per l'ascolto questi. WCF, questo è così facile da eseguire. La classe di servizio implementerà semplicemente interfacce IWorker e ISpectator.

Classe del servizio logger può essere visualizzata nella Figura 5 . Non c'è molta differenza strutturale tra questa classe e il lavoro. Nel metodo IWorker.Heartbeat, il logger è il caching tipico di dati di stato del lavoro. Inoltre, se questo è lo stato primo visualizzato per un lavoro o se lo stato è cambiato per un lavoro, un messaggio viene scritto output standard. Analogamente, il metodo ISpectator.WorkItemStateChanged scrive un messaggio all'output standard. Una produzione sistema verranno probabilmente eseguire un'operazione molto più interessante, come la scrittura di un database di registrazione per i report personalizzati.

Nella figura 5 logger

[ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)]
public class Logger : IWorker, ISpectator {
  private ServiceHost _host;
  private Dictionary<string, WorkerStatus> _workers;

  public Logger() {
    _workers = new Dictionary<string, WorkerStatus>();
  }

  public void Start() {
    _host = new ServiceHost(this);
    _host.Open();
  }

  public void Stop() {
    _host.Close();
  }

  void IWorker.Heartbeat(WorkerStatus status) {
    bool changed = false;

    lock (_workers) { 
      if (_workers.ContainsKey(status.WorkerId)) {
        WorkerStatus s = _workers[status.WorkerId];

        if (s.State != status.State)
          changed = true;
      }
      else
        changed = true;

      _workers[status.WorkerId] = status;
    }

    if (changed)
      LogWorkerStatusChange(status);
  }

  void IWorker.DoWork(string workerId, WorkItem work) {
    //Ignored. This can be assumed by the
    //WorkItemState.Queued status event.
  }

  void ISpectator.WorkItemStateChanged(WorkItemStatus status) {
    Console.WriteLine(
      "Worker: {0}| WorkItem: {1}| State: {2}", 
      status.WorkerId, 
      status.WorkItemId, 
      status.NewState);
  }

  private void LogWorkerStatusChange(WorkerStatus status) {
    Console.WriteLine(
      "Worker {0}| State: {1}", 
      status.WorkerId, 
      status.State);
  }
}

L'ultimo elemento di azione di implementazione il logger consiste nell'impostare configurazione. Nella figura 6 viene illustrata la configurazione per questo servizio WCF. Notevoli differenze sono la mancanza di una sezione di client e l'aggiunta di un endpoint del servizio. Poiché un spettatore logicamente è solo un consumer di informazioni, non è alcun motivo di agire come un client. Aggiungere un endpoint del servizio consente di utilizzare la stessa istanza di oggetto servizio per IWorker e ISpectator sink di messaggi.

Nella figura 6 configurazione per logger

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="LoggerPeerApplication.Logger">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.ISpectator"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

  </system.serviceModel>
</configuration>

Per completare la funzionalità ISpectator, È necessario modificare il peer di lavoro per effettuare chiamate appropriate. Include un nuovo endpoint client per il file app.config:

<endpoint
  name="ISpectatorClient"
  address="net.p2p://MyPeerApplication"
  binding="netPeerTcpBinding"
  bindingConfiguration="unsecureBinding"
  contract="PeerWcfComm.ISpectator" />

Questo endpoint dispone di un nuovo nome (in modo che il costruttore di ChannelFactory può trovare le impostazioni corrette) e il tipo di contratto di assistenza ISpectator nuovo. <ispectator>Per le modifiche del codice, È necessario semplicemente creare un nuovo ChannelFactory < ISpectator > e un nuovo canale client ISpectator. Il metodo DoWorkInternal sulla classe lavoro in seguito modificato alla versione leggermente inferiore aesthetically accattivanti illustrato nella Figura 7 . Essenzialmente, prima e dopo l'elaborazione di elemento di lavoro (inattivo il thread corrente) verrà per chiamare il metodo WorkItemStateChanged sul canale ISpectator fornendo lo stato appropriato.

Verifica stato attività nella figura 7

this._state = WorkerState.Busy;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() { 
    WorkerId = _id, 
    WorkItemId = wrk.WorkItemId, 
    NewState = WorkItemState.Executing 
  });

Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));

this._state = WorkerState.Ready;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() {
    WorkerId = _id,
    WorkItemId = wrk.WorkItemId,
    NewState = WorkItemState.Finished
  });

Condivisione del lavoro e bilanciamento del carico

È possibile provare un'altra esperimento. Anche in questo caso, creare due processi di lavoro e un singolo utente. Dispone l'utente generare molti elementi di lavoro per i processi di lavoro. A questo punto, mentre due processi di lavoro lavora stoccaggio, avviare una terza istanza di lavoro. Da ciò, verrà visualizzato un comportamento indesiderato del mio sistema: un lavoro è inattivo mentre altri sono molto operazioni da eseguire.

In un'applicazione P2P, nuovi peer possono portate in linea in qualsiasi momento per partecipare al reticolo. In uno scenario di farm del server, È necessario nuovo peer per unire automaticamente in e avviare immediatamente l'elaborazione del carico di lavoro collettiva.

La prima modifica per rendere è implementazione del metodo IWorker.Heartbeat il lavoro. Attualmente, il codice genera semplicemente lo stato di un lavoro alla console. Questo deve essere sostituito con il codice che memorizza nella cache stati degli fellow processi di lavoro. Ciò accadeva in peer utente e spettatore e può generalmente tagliare e incollare tale codice nella classe di lavoro.

Il peer di lavoro può tenere traccia delle altri processi di lavoro, È possibile aggiungere codice per verificare condizioni in cui un lavoro dovrà condividere elementi di lavoro (Avanti) per processi di lavoro inattivi. È stato modificato fine del metodo MonitorWork il lavoro come illustrato nella Figura 8 .

Nella figura 8 aggiornato MonitorWork

if (_state != WorkerState.Offline) {
  if (_workQueue.Count > 1) {
    WorkerStatus firstIdleWorker = GetNextIdleWorker();

    if (firstIdleWorker != null) {
      WorkItem workToForward;
      lock (_workQueue)
        workToForward = _workQueue.Dequeue();

       _workerClientChannel.DoWork(
        firstIdleWorker, 
        workToForward);
    }
  }

  ThreadPool.QueueUserWorkItem(MonitorWork);
}

In sostanza, dopo l'elaborazione di un elemento di lavoro e prima di riprogrammazione stesso, il metodo verifica per un nodo lavoro inattivo. (Aggiunta la condizione di eseguire questa operazione solo se ho più di una voce nella coda proprio lavoro. Questa è perché è opportuno non inoltrare il solo elemento di lavoro rimanente a un altro lavoro.) Se viene trovato un lavoro inattivo, dequeues un WorkItem e la invia a tale lavoro inattivo. Il metodo GetNextIdleWorker contiene la logica di selezione:

private WorkerStatus GetNextIdleWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);
  return workers
    .Where<WorkerStatus>(
      w =>
        (w.State == WorkerState.Ready
        && w.PendingWorkItemCount == 0))
    .FirstOrDefault<WorkerStatus>();
}

La prima cosa che faccio è blocco dati _workers memorizzare nella cache e ottenere uno snapshot dei valori WorkerStatus. È possibile quindi cercare processi di lavoro nello stato pronto senza lavoro in sospeso, restituisce il primo elemento trovato.

Backup WorkItem

Se un WorkItem viene spostato in un computer e in coda, cosa diventa di esso se quel peer lavoro accidentalmente? Un arresto anomalo del processo non consentono di salvare i dati all'errore. Una buona attenuazione è backup copie di elementi di lavoro su adiacenti peer. Una soluzione di backup completa è costituito da due azioni: backup e ripristino. Non è un'attività semplice.

La soluzione di backup più semplice, è possibile riutilizzare il metodo DoWork. Il primo parametro specifica l'ID di lavoro che dovrebbe per eseguire il lavoro. Tuttavia, tale messaggio effettivamente passa out del reticolo intero. Potrebbe sfruttare questo fatto e sono tutti i colleghi che non corrispondono al lavoro specificato ID di salvare il WorkItem come una copia di backup. Il lavoro effettivo destinazione del messaggio sarebbe infine informare gli altri L'utenti per eliminare tale backup.

Il compromesso è che ogni lavoro utilizza le risorse di cache di backup di elementi di lavoro. Se non vi sono molti elementi di lavoro e processi di lavoro possano elaborarli molto rapidamente, può essere una soluzione accettabile. Tuttavia, se non esistono molti elementi di lavoro nel sistema, potrebbe visualizzato troppo costosa.

Una soluzione migliore potrebbe ridurre il numero totale di backup, ma a discapito della alcuni aggiuntivi reticolo messaggi.

Richiede una pagina di articolo Kevin Hoffman" Come struttura stato di condivisione in una rete peer", Il possibile backup elementi di lavoro per adiacenti peer inviando un messaggio di limitata a un numero di hop singolo. La richiesta di backup e la comunicazioni successive possono essere come chatty come desiderato. Poiché i messaggi di backup non sono a utenti o Spectators significativi, è possibile non utilizzare il contratto di assistenza IWorker. HO creato un nuovo IWorkBackup chiamato uno:

[ServiceContract]
public interface IWorkBackup {
  [OperationContract(IsOneWay = true)]
  void BackupItem(WorkItemBackupRequest request);

  [OperationContract(IsOneWay = true)]
  void DeleteBackup(Guid workItemId);

  [OperationContract(IsOneWay = true)]
  void RestoringBackup(Guid workerId, Guid workItemId);
}

Il primo metodo viene chiamato da un lavoro che desiderano eseguire il backup un WorkItem. La seconda viene chiamata quando un WorkItem è stata completata e altri processi di lavoro è possibile eliminare le copie di esso. L'ultima viene chiamato quando un lavoro rileva che un WorkItem di backup deve essere ripristinato e intende farlo.

I messaggi inviati Dall'BackupItem sono definito dalla classe WorkItemBackupRequest:

[MessageContract]
public class WorkItemBackupRequest {
  [PeerHopCount]
  public int HopCount = 1;

  [MessageBodyMember]
  public WorkItem Item;

  [MessageBodyMember]
  public Guid OriginalWorkerId;
  public DateTime BackupRequestTimeStamp;
  public DateTime RestoreAt;
}

Il membro HopCount dispone l'attributo PeerHopCount, che limita l'attraversamento messaggio attraverso il reticolo al numero specificato di hop. È stato codificato per il valore 1 in modo che questo messaggio verrà inviato a adiacente (un hop) peer. Sono disponibili anche due membri contrassegnati con l'attributo MessageBodyMember: elemento (l'elemento di lavoro eseguire il backup e OriginalWorkerId (lavoro richiede il backup. Questi verrà inviati nel messaggio SOAP sottostante. È anche stato aggiunto due ulteriori membri per la tracciabilità interna.

Quando un lavoro riceve il messaggio RequestBackup, è necessario memorizzare nella cache tale elemento. A tale scopo abbastanza facilmente, aggiungendo un nuovo campo privato per archiviare i backup:

private Dictionary<string, WorkItem> _workItemBackups;

Di seguito è l'implementazione del metodo RequestBackup per un peer di lavoro:

void IWorkBackup.BackupItem(WorkItemBackupRequest request) {
  request.BackupRequestTimeStamp = DateTime.Now;
  request.RestoreAt = DateTime.MaxValue;

  lock (_workItemBackups)
    _workItemBackups.Add(request.Item.WorkItemId, request);
}

Questo metodo imposta il BackupRequestTimeStamp l'ora corrente e RestoreAt impostazione predefinita viene ricavata per DateTime.MaxValue. Questi campi provare a essere utile in seguito.

Lavoro completato sul peer di lavoro locale, è necessario inviare un messaggio a tutti i colleghi che sono potenzialmente memorizzati una copia di backup di elemento di lavoro. Ciò avviene chiamando il metodo DeleteBackup e fornendo l'ID di WorkItem che è stata completata.

Si noti che è possibile non limitare questo messaggio adiacenti peer come HO fatto con il messaggio BackupItem. Il motivo è possibile modificare la topologia del reticolo di peer. Nell'intervallo di tempo che è stata richiesta una copia di backup e il WorkItem viene completata, che intercorre tra colleghi di lavoro una volta sono adiacenti possono non essere a questo punto. Di conseguenza, è possibile inviare un messaggio generico per tutti i processi di lavoro per eliminare i backup di questo articolo, se hanno una. Per completezza, ecco il codice nel peer di lavoro che implementa DeleteBackup:

void IWorkBackup.DeleteBackup(Guid workItemId) {
  lock (_workItemBackups)
    if (_workItemBackups.ContainsKey(workItemId))
      _workItemBackups.Remove(workItemId);
}

Backup e la rimozione di elementi di lavoro è un breeze! Tuttavia, la difficoltà true si verifica nel determinare quando una produzione backup articolo deve essere ripristinato. È necessario determinare sia un elemento di lavoro deve essere ripristinato e che verrà ripristinarlo.

Per determinare quando un elemento di lavoro deve essere ripristinato, ogni lavoro periodicamente necessario monitorare la cache di backup. A tal fine, È stato aggiunto un altro System.Threading.Timer denominato _backupMonitor che Ottiene impostata nel metodo di avvio per il lavoro:

_backupMonitor = new Timer(
  this.MonitorBackups,
  null,
  new TimeSpan(0),
  new TimeSpan(0, 1, 0));

Questo può eseguire in un periodo più lento rispetto l'heartbeat, pertanto è possibile impostare il timer per generare ogni minuto.

Il metodo MonitorBackups gestisce la logica per il ripristino dei backup e può essere visualizzato nella Figura 9 . Questo metodo verrà scorrere l'elemento di backup e verificare ciascuna di esse per la necessità di essere ripristinato. Per tale scopo, è necessario effettuare alcune ipotesi. In primo luogo, si dispone di una variabile locale denominata timeOutThreshold. Questa è la quantità di tempo da quando È stato detto backup un elemento quando necessario previsto per l'elaborazione. È codificato per un minuto (anche se dovrebbe preferibilmente essere configurabile) poiché noto che gli elementi di lavoro possono durare non più di 15 secondi (questo è stato definito da me nella mia applicazione utente). Di conseguenza, tranne in Utilizzo di picco, deve essere moltissime ora. (Potrebbe essere una soluzione migliore per il mittente della richiesta di backup determinare questo timeout e specificarlo nel messaggio di richiesta. Viene ha una conoscenza del carico di lavoro e può fornire una stima molto migliore.) Se l'ora corrente è maggiore il WorkItemBackupRequest.BackupRequestTimeStamp oltre il timeOutThreshold, verrà per analizzare ulteriormente. Automaticamente non è consigliabile elemento di lavoro non riusciti perché il processo di lavoro originale potrebbe semplicemente effettivamente occupato. Tuttavia, se il tempo è scaduto, è possibile spostare su a controllare lo stato del lavoro originali. Se è impostato su non disponibile o pronto con elementi nella coda di lavoro verrà si presuppone semplicemente occupata e verrà comunque a questo elemento di lavoro.

Nella Figura 9 MonitorBackups

private void MonitorBackups(object context) {
  List<WorkItemBackupRequest> backups = 
    new List<WorkItemBackupRequest>();
  TimeSpan timeOutThreshold = 
    new TimeSpan(0, 1, 0); //This should be read in from config.

  lock (_workItemBackups) {
    backups.AddRange(_workItemBackups.Values);

    foreach (WorkItemBackupRequest backup in backups) {
      //If we've passed the RestoreAt time, we need to
      //take on this work item.
      if (backup.RestoreAt < DateTime.Now) {
        lock (_workQueue)
          _workQueue.Enqueue(backup.Item);

        _workItemBackups.Remove(backup.Item.WorkItemId);
      }

      //This work item has not been deleted yet. 
      //This is suspicious so investigate further.
      else if (backup.BackupRequestTimeStamp.Add(timeOutThreshold) >
         DateTime.Now) {

        //Check to see if the original worker is still operating.
        //Regardless, we won't do anything and the backup
        //remains in our cache.
        if (!WorkerIsStillActive(backup.OriginalWorkerId)) {
          //The machine doesn't seem to be actively working.
          //Set a deadline for restoration of the work item.
          backup.RestoreAt = DateTime.Now.Add(timeOutThreshold);

          //Tell the world our intentions to restore this.
          _backupClientChannel.RestoringBackup(
            this._id, backup.Item.WorkItemId);

          Console.WriteLine("Planning to restore '{0}'", 
            backup.Item.WorkItemId);
        }
      }
    }
  }
}

private bool WorkerIsStillActive(Guid workerId) {
  bool active = false;

  lock (_workers) {
    if (_workers.ContainsKey(workerId)) {
      WorkerStatus sts = _workers[workerId];
      if (sts.State == WorkerState.Busy)
        active = true;
      else if (sts.State == WorkerState.Ready && 
        sts.PendingWorkItemCount > 0)
        active = true;
    }
  }

  return active;
}

Se si Introduzione a questo punto e visualizzare sia il timeout utilizzata è scaduto e che il processo di lavoro originale è che non risponde, È necessario avviare il processo di ripristino di tale elemento di lavoro. Questa operazione viene eseguita chiamando il metodo IWorkBackup.RestoringBackup per annunciare l'intenzione a tutti gli altri peer. È possibile inoltre impostare il campo RestoreAt dell'oggetto WorkItemBackupRequest a un'ora futura. Anche se è riutilizzare lo stesso valore di TimeSpan, questa volta futura è diversa dal punto di vista logico (e preferibilmente deve disporre di un punto di configurazione univoche). RestoreAt è l'ora in cui verrà si presuppone che tutti i peer hanno ricevuto il messaggio e hanno avuto il tempo di rispondere, se necessario. Ulteriori informazioni su questa logica. Ma per ora, sapere che È impostata RestoreAt sull'ora quando effettivamente sposterà il WorkItem dalla cache backup alla coda di lavoro. La volta successiva viene generato il timer MonitorBackups verrà controllare questo valore e funzionano correttamente.

Questa logica determina quando si verifica un ripristino. È la parte più semplice per determinare. Il trickiest è che necessario ripristinare un elemento di lavoro. PeerChannel tenta di mantenere un reticolo ottimale e la topologia può variare via peer join e lasciare. Di conseguenza, un unico peer può disporre da due a sette nodi adiacenti in qualsiasi momento. Se una richiesta di backup di un hop è stata inviata a tutti, sarebbe ho più processi di lavoro vying tutti per ripristinare il lavoro stesso poiché essi avere la stessa logica determina quando deve essere ripristinato un WorkItem. Pertanto, cosa succede quando sono tutte contemporaneamente la prova e il ripristino? Per ripristinare, i peer devono innanzitutto annunciare gli intenzione chiamando il metodo IWorkBackup.RestoringBackup (vedere la Figura 10 ).

Nella figura 10 IWorkBackup.RestoringBackup

void IWorkBackup.RestoringBackup(Guid workerId, Guid workItemId) {
  lock (_workItemBackups) {
    //Only proceed if we have this item backed up.
    if (_workItemBackups.ContainsKey(workItemId)) {
      //Get the backup request for this work item
      WorkItemBackupRequest req = _workItemBackups[workItemId];

      //Check to see if we've decided to restore this.
      if (req.RestoreAt != DateTime.MaxValue) {
        //The restorer with the greatest id wins!
        if (this._id.CompareTo(workItemId) < 0)
          _workItemBackups.Remove(workItemId);
      }
      else {
        //We haven't decided to restore it and 
        //someone else wants to. So let them.
        _workItemBackups.Remove(workItemId);
      }
    }
  }
}

A questo punto, esistono tre scenari come visto dal punto di vista di un lavoro messaggio RestoringBackup appena ricevuto da un altro lavoro:

  1. Il lavoro sta ripristinando un elemento non sono memorizzati nella cache.
  2. Il lavoro sta ripristinando un elemento È sono memorizzati ma ancora non intende ripristinare.
  3. Il lavoro sta ripristinando un elemento che sono inoltre intendendo per ripristinare.

Scenario 1 è semplice non è necessaria nessuna azione. Scenario 2 è anche semplice in quanto semplicemente possibile rimuovere l'elemento di backup dalla cache. Ancora non determinato che è necessario il ripristino, ma un altro utente ha quindi possono includere.

È lo scenario 3 che richiede la risoluzione di conflitti. Ogni peer che agisce autonomamente e contemporaneamente potrebbe annunciare che si intende ripristinare un elemento. I messaggi possibile passare tra loro in rete e pertanto più lavoro riuscito a trovare stesso nello scenario 3.

La domanda chiave è: che wins? Che ripristina e che ignora? La decisione deve essere apportata in qualcosa che non è relativo tra i peer (come ora di un sistema) e non viene modificato. HO deciso che un valore passa l'massimo valutato ID lavoro. Una chiamata impartial Guid.CompareTo determina il vincitore. Questo punto, il vincitore non il ripristino e Ignora l'ignorato.

Fun peer

È stato ammetterlo. . . Rete P2P è divertente. Ma questo non negare il fatto che esistono scenari reali in cui la natura decentralizzata rende un buon adattamento per le applicazioni. Inoltre, queste applicazioni non limitate a chat o supporto pirating! Le applicazioni di attivazione peer che richiedono l'elaborazione di massa è un adattamento naturale una farm personalizzata dell'applicazione o di uno scenario HPC (prestazioni alta utenti). Consente di coordinamento del lavoro e l'elaborazione su larga scala relativamente semplice.

La struttura di WCF rende semplice per implementare un servizio e si evolvono esigenze. I dati condivisi e la comunicazione di un reticolo P2P consente diversi client di osservare e partecipare a cosa sta succedendo. In questo articolo viene fornita una dimostrazione di creazione di una piattaforma di elaborazione attivata peer in cui più lettori (processi di lavoro, gli utenti e Spectators possibile funzione tutti insieme per uno scopo comune: il lavoro eseguito.

Matt Neely è possibile è un Software Design Engineer presso Microsoft nel team di strumenti di formazione utenti di SQL Server. Trascorre la maggior parte dei convincing suo amico ha verrà essere menzionato nell'articolo imminente Caleb e shaving suo cane un goldendoodle da si con risultati astonishingly horrific. È possibile contattarlo all' mneely@Microsoft.com.