Este artículo proviene de un motor de traducción automática.

Del mismo nivel divertida

Un proceso de trabajo de Peer-to-Peer aplicación con WCF

Matt Neely

Descarga de código de la Galería de código de MSDN
Examinar el código en línea

En este artículo se describen:
  • Comunicaciones punto a punto básicas
  • Crear el mismo nivel de trabajo
  • El equilibrio de carga y uso compartido de trabajo
  • Elementos de trabajo de copia de seguridad y restauración
En este artículo se utilizan las siguientes tecnologías:
WCF

Contenido

Comunicación del mismo nivel básico
El punto de trabajo
Realizar el trabajo de trabajo
El mismo nivel de usuario
Spectators
Compartir de trabajo y el equilibrio de la carga en la
Copia de seguridad de elementos de trabajo
Del mismo nivel divertida

el grupo tengo una parte de en Microsoft permite nos toma una semana fuera de nuestras tareas regulares para buscar un tema. Recientemente elegirá dedicar a la semana investigar redes de igual a igual (P2P). Esa investigación aumentó en un interés permanente en este nuevo paradigma de red.

Una de las razones por las que posee P2P piqued que el interés se debía a un sistema de generación de documentación internos de mi equipo. Tenemos autores escribir miles de documentos de Ayuda que necesitan para se analiza, transformar, revisarse y compila. Los documentos se pueden compilar en diversos formatos de salida y en varios idiomas cada noche. Para realizar esta operación, tenemos un conjunto de servidores que ejecutan nuestro software de generación.

El mecanismo que se usan actualmente es la arquitectura cliente-servidor típica. Trabajos de generación están en cola en un servidor central y doled fuera como servidores queden libres procesarlas de generación. Nos ocasión un atasco de trabajos en el servidor central o en un servidor de generación con problemas. Muchos miembros de equipo tienen un interés vested en el estado del sistema de compilación, pero el sistema es opaco y difícil de supervisar.

En este artículo es el resultado de una versión P2P de este sistema de compilación de previsión. Explicará emergente diseño de un proceso de trabajo genérico y aplicación de habilitado mismo según la PeerChannel Windows Communication Foundation (WCF). Le guiará la construcción de este sistema al discutir algunas de las decisiones de diseño que deben realizarse en el proceso.

Comunicación del mismo nivel básico

Para este proyecto, comenzaré mediante la definición de Mis compañeros de procesamiento de trabajo principales. Dará a nombre genérico de trabajadores. El plan será que un trabajo por equipo físico (aunque no existen limitaciones técnicas que requiere este).

Cada trabajo será necesario comunicarse y coordinar con otras personas para realizar el trabajo. Para ello, necesita algunos conocimientos acerca de otros trabajadores. ¿Cómo se mi trabajo obtener esta información?

Un enfoque es que cada trabajo sondear cada otro trabajo de su estado. Este mecanismo es complejo que requiere dos fases: detección de trabajo y consultas de trabajo. Además, si cada trabajo hacer ping en cada otro trabajo, podría tener muchos mensajes innecesarios que viajan a la red. Junto con el hecho de que la PeerChannel de WCF es inherentemente multidifusión, haga ping a un solo trabajo realmente enviará un mensaje para cada trabajo. Para n compañeros de trabajo, tendría mensajes n3 volador alrededor. Rechazó este enfoque para un modelo de difusión.

En un modelo de difusión, cada trabajo enviará periódicamente fuera de su estado. LLAMO a éste un mensaje de latido. Otros trabajadores pueden decidir qué hacer con esta información, incluso si omitirlo sólo. YO prefiero este enfoque porque la topología de maya trabajo cambia, puede deducir los nodos procedentes de conexión por la presencia o ausencia de los mensajes de latido y. El descubrimiento de trabajo del mismo nivel es pasivo en lugar de activo. Este modelo también reduce el tráfico de red necesario sólo para upkeep del sistema. Con n compañeros de trabajo desea tener n2 mensajes en este modelo, que es una mejora sobre el primer enfoque.

Normalmente para crear un servicio en WCF, se inicia con la definición de su contrato de servicio. Esto es la interfaz que define la funcionalidad de que un servicio expone a los clientes. En un escenario P2P, cada equipo del mismo nivel actúa como servidor y cliente y el contrato de servicio creado aquí definirá cómo comunican entre sí equipos del mismo nivel. Dada la decisión para difundir el estado de trabajo, puede definir el contrato de servicio:

[ServiceContract]
public interface IWorkerPeer {
  [OperationContract(IsOneWay=true)]
  void Heartbeat(WorkerStatus status);
}

El contrato de servicio IWorkerPeer tiene una operación única, unidireccional denominada Heartbeat. Cuando un trabajo llama a este método, enviará fuera de la instancia de objeto WorkerStatus rellenadas previamente para cada punto de trabajo que participan en la malla.

Ésta es la definición simple de la clase WorkerStatus:

[DataContract]
public class WorkerStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public WorkerState State;
}

Contendrá un identificador único para el trabajo y un valor de enumeración de estado simple. En un sistema más funcional, podría tratarse de un conjunto de complicadas de datos incorpora descriptores adicionales, los contadores de rendimiento, las estadísticas, trabajo (como el porcentaje de elementos de trabajo con error) y más. Como se define, sin embargo, esto mínimamente permitirá cada trabajo saber el estado de cada otro trabajo de la malla.

Los estados que ha definido son desconocido, listo, no disponible y sin conexión. Un estado de desconocido significa que no estoy seguro de estado del trabajo (quizás debido a la ausencia de un latido de ese trabajo en un período de tiempo especificado). Un estado Ready significa que el trabajo está funcionando correctamente y está preparado para tomar en algún trabajo. Un ocupado estado significa que el trabajo está funcionando correctamente, pero ocupado realizar trabajo. Un estado sin conexión significa que el trabajo no está disponible para realizar trabajo. Potencialmente puede el estado enviado desde último latido un trabajo o quizás un trabajo que realiza sin conexión por un administrador de mantenimiento o la depuración.

El punto de trabajo

Para que una aplicación del mismo nivel de trabajo funcional, es necesario exponer y consumir el servicio. Por lo que crea una aplicación de consola simple y le asigna una clase denominada trabajo como se ve en la figura 1 . En mi clase de trabajo, agregan iniciar y detener métodos para controlar el programa de instalación y desmontaje del servicio expuesto (que recibe los mensajes), así como el canal de cliente (para el envío de mensajes). Una instancia System.Threading.Timer utilizar también para llamar periódicamente al método SendHeartbeat. SendHeartbeat crea una instancia de un objeto WorkerStatus, lo rellena con los datos apropiados y, a continuación, se envía a todos los equipos del mismo nivel. Como un servicio, cuando obtiene llama al método IWorker.Heartbeat, se simplemente resultado el contenido del mensaje salida estándar.

Figura 1 la clase de trabajo

namespace WorkerPeerApplication {
  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]

  public class Worker : IWorker {
    private ServiceHost _host;
    private ChannelFactory<IWorker> _factory;
    private IWorker _workerClientChannel;

    private Guid _id;
    private volatile WorkerState _state;

    private Timer _heartbeat;

    public Worker() {
      _id = Guid.NewGuid();
      _state = WorkerState.Offline;
    }

    public void Start()  {
      _host = new ServiceHost(this);
      _host.Open();

      _factory = new ChannelFactory<IWorker>("IWorkerClient");
      _workerClientChannel = _factory.CreateChannel();

      _state = WorkerState.Ready;
      _heartbeat = new Timer(
        this.SendHeartbeat,
        null,
        new TimeSpan(0),
        //Time should really be read in from config.
        new TimeSpan(0, 0, 1)); 
    }

    public void Stop() {
      _state = WorkerState.Offline;
      _heartbeat.Dispose();
      this.SendHeartbeat(null);
      _factory.Close();
      _host.Close();
    }

    private void SendHeartbeat(object context) {
      WorkerStatus ws = new WorkerStatus();
      ws.WorkerId = _id;
      ws.State = _state;

      _workerClientChannel.Heartbeat(ws);
    }

    void IWorker.Heartbeat(WorkerStatus status) {
      Console.WriteLine(
        "{0}: {1}", 
        status.WorkerId, 
        status.State);
    }
  }
}

Para finalizar, necesitan un método Main y la configuración de WCF apropiada, que coloca en el archivo de configuración de la aplicación. El método Main puede verse en la descarga de código, y la configuración de que uso se muestra en la figura 2 .

La Figura 2 configuración del trabajo

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="WorkerPeerApplication.Worker">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint           
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

    <client>
      <endpoint
        name="IWorkerClient"
        address="net.p2p://MyPeerApplication"
        binding="netPeerTcpBinding"
        bindingConfiguration="unsecureBinding"
        contract="PeerWcfComm.IWorker" />
    </client>

  </system.serviceModel>
</configuration>

Puede ver que definen tanto el servicio que expone el mismo nivel y configuración del lado cliente del mismo nivel. La configuración del servicio especifica el tipo del servicio, así como su dirección base y el extremo. ESTOY utilizando un enlace no seguro y el mismo nivel nombres resolución protocolo (PNRP) para resolver los nombres de mismo nivel. La configuración del cliente es casi un reflejo del servicio. La diferencia notable sólo es que asignó el extremo de cliente de un nombre para que el constructor ChannelFactory pueda tener acceso fácilmente a la configuración deseada.

Ahora que tiene el shell de un punto de trabajo, vamos a prueba de unidad. Para un experimento, ejecutar dos instancias de este programa al mismo tiempo. Verá que en primer lugar cada punto salida su propia información de latido. Después de unos segundos, compañeros comenzará a buscar entre sí y cada trabajo recibirá el estado de los demás.

Francamente, considere molestia y distraer para que mis propias los mensajes se envían al me. Por suerte, el Blog del equipo PeerChannelproporciona el código que se necesita. Verá una clase muy útil denominada RemoteOnlyPropagationFilter. Préstamo esto, pueden, a continuación, agregue las dos líneas siguientes de código al mi Worker.Start método para asegurarse de que los mensajes que envían no también terminan enviados a mí:

PeerNode pn = ((IClientChannel)_clientChannel).GetProperty<PeerNode>();
pn.MessagePropagationFilter = new RemoteOnlyMessagePropagationFilter();

Estas dos líneas de código se agregaron antes de comenzar el temporizador de latido.

Realizar el trabajo de trabajo

Hasta ahora, tiene una aplicación que simplemente se comunica entre compañeros de trabajo. Los trabajadores realmente no realizar cualquier trabajo todavía. Le solucionar esto por contribuye a aumentar la interfaz IWorker, agregar el siguiente método:

[OperationContract(IsOneWay = true)]
void DoWork(Guid workerId, WorkItem work);

Este método permite que una entidad externa indicar un interlocutor de trabajo para realizar un elemento de trabajo. Desde el WCF de mensajería se multidifusión, agrega un parámetro para especificar el IDENTIFICADOR del trabajo que va a realizar el trabajo. HABLARÉ sobre la clase de elementos de trabajo en un momento. Pero por ahora, el cambio en la interfaz requiere que actualizar la clase de trabajo mientras los ve en la figura 3 . Cuando un usuario indica un trabajo realizar algún trabajo, esta solicitud se agrega a una cola interna y, finalmente, se ejecutan en el orden recibido. Para realizar realmente el trabajo, agregan un método denominado MonitorWork a la clase de trabajo. El método Start simplemente llamará ThreadPool.QueueUserWorkItem, hacia el método MonitorWork.

La figura 3 revisado trabajo

public class Worker : IWorker {
  // Omitted for clarity ...
  private Queue<WorkItem> _workQueue;

  public Worker() {
    // Omitted for clarity ...
    _workQueue = new Queue<WorkItem>();
  }

  public void Start() {
    // Omitted for clarity ...
    ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void MonitorWork(object context) {
    if (_workQueue.Count > 0) {
      lock (_workQueue)
        _currentWorkItem = _workQueue.Dequeue();

      DoWorkInternal(_currentWorkItem);
      _currentWorkItem = null;
    }

    if (_state != WorkerState.Offline)
      ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void DoWorkInternal(WorkItem wrk) { ...}

  void IWorker.DoWork(Guid workerId, WorkItem work) {
    if (workerId == _id)
      lock (_workQueue)
        _workQueue.Enqueue(work);
  }
}

En un nivel alto, MonitorWork se hacer algo y, a continuación, volver a programar propio mediante otra llamada a ThreadPool.QueueUserWorkItem. Lo hará Esto hasta que el estado del trabajo sin conexión, lo que ocurre cuando se llama al punto en el objeto de trabajo. Algo que lo hace es comprobar si la cola de trabajo tiene algo en él. Si la cola contiene los elementos de trabajo, MonitorWork se dequeue los elementos de trabajo siguiente y empezar a procesar se. El comportamiento deseado es mi trabajo procesar serialmente su trabajo, aunque se podría ser modificar fácilmente para procesar los elementos en paralelo.

Como se ver desde el código en la figura 3 , no han tratan todavía es el trabajo. Simplemente esto ha pasado al método DoWorkInternal todavía -a-se-definido. Por lo que primero veamos la definición de clase de elementos de trabajo. Como el sistema que estoy creando para este artículo es independiente de tareas, voy a definir simplemente el elemento de trabajo siguiente:

[DataContract]
public class WorkItem {
  [DataMember]
  public Guid WorkItemId { get; set; }

  [DataMember]
  public int PretendToWorkSeconds { get; set; }
}

Debe volver a utilizar un atributo DataContract o MessageContract para que WCF correctamente puede serializar la clase personalizada. La clase de elementos de trabajo tiene un campo de identificador, WorkItemId y un campo que especifica el número de segundos que feign realizar trabajo real (denominado PretendToWorkSeconds).

Ahora que se define un elementos de trabajo, puede procesar. Ésta es la implementación del método DoWorkInternal:

private void DoWorkInternal(WorkItem wrk) {
  this._state = WorkerState.Busy;
  Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));
  this._state = WorkerState.Ready;
}

Esto es donde normalmente podría realizar mi trabajo significativo (por ejemplo, la creación de la documentación de instalación de SQL Server 2008 en portugués de Brasil). Pero por motivos de este artículo, me sólo va a modo de suspensión el período de tiempo especificado.

Por último, actualizar mi clase WorkStatus con dos miembros de datos nuevos que se establecerá antes de enviar de latido un trabajo. Son el identificador del elemento de trabajo que se está ejecutando actualmente y un recuento de los elementos de trabajo pendientes en ese mismo nivel de trabajo:

[DataContract]
public class WorkerStatus {
  // Omitted for clarity...

  [DataMember]
  public Guid CurrentWorkItem;

  [DataMember]
  public int PendingWorkItemCount;
}

Ahora que Mis compañeros de trabajo tienen algunos datos, se debe conocer perfectamente escenarios que poner sobre la pérdida de datos. Uno de los equilibrios de un paradigma de red descentralizado como, P2P es hay no es repositorio central para los datos. Cada equipo del mismo nivel es responsable de hace referencia colectivamente como conservar datos del sistema. Por lo tanto ¿qué ocurre si un punto de trabajo deja de funcionar? Específicamente para mi sistema, ¿qué ocurre con la cola WorkItems?

Agrupar escenarios de cierre del mismo nivel en dos categorías: esperada y inesperado. Para cierres de una naturaleza esperado, es necesario tratar la WorkItems en cola. El paradigma para controlar estos elementos puede variar en función de la naturaleza de la aplicación y los datos. Si los datos no son importantes, posible que pueda para simplemente descartar todo los WorkItems en cola. Si la aplicación está diseñada para estar pausado (o para reiniciar automáticamente como un servicio de Windows), quizás serializar los elementos que el disco y leerlos al inicio es la solución correcta. Todavía otra opción es simplemente reenviar el WorkItems sobrantes a otros trabajadores antes para apagar. HABLARÉ sobre cierre inesperado posterior de este artículo en el contexto de copia de seguridad de los datos.

El mismo nivel de usuario

Hasta ahora, tiene una aplicación de habilitado del mismo nivel que puede aceptar y procesar los elementos de trabajo definido. ¿Pero que crea estos elementos trabajo y las envía a los compañeros de trabajo? Una de las características eficaces de un sistema P2P es que cualquiera puede enlazar a la misma infraestructura de comunicaciones. Todo lo que se necesita para participar (para mallas no seguros) es conocimiento de la dirección de malla y el contrato de servicio. Mostraré esto al agregar un nuevo tipo de mismo nivel denominado un punto de usuario.

La clase para el mismo nivel de usuario es similar al trabajo del mismo nivel que se ha indicado anteriormente. El mismo nivel de usuario también implementan la interfaz de IWorker, pero no necesario proporcionar una implementación del método DoWork. (No olvide considerar las interfaces de ServiceContract como un contrato de comunicación y no como una declaración de identidad. Un usuario no es un trabajo, pero participa de la malla de mismo nivel IWorker y, por tanto, implementa dicha interfaz.)

El mismo nivel de usuario está interesado seguimiento trabajadores, así como crear nuevos elementos de trabajo y les da a los trabajadores. la figura 4 muestra una captura de pantalla de la aplicación de usuario. Esta aplicación se muestra la información recibido a mensajes de estado de trabajo. Sin embargo, la funcionalidad que want to focus on es la creación de WorkItems y, a continuación, asignarlos a trabajadores.

fig01.gif

La figura 4 la aplicación del mismo nivel de usuario

Una decisión de diseño debe realizarse con respecto a cómo se asignan WorkItems a trabajadores. Un método sería dependen de la naturaleza de multidifusión de PeerChannel y difunda un mensaje a todos los trabajadores diciendo que estaba disponible un elementos de trabajo de nuevo. El primer trabajo para responder que es capaz de llevarlo y listo prevalece el derecho a tener los elementos de trabajo nuevo.

No me gusta este enfoque por varias razones. En primer lugar, el protocolo de comunicaciones sería bastante familiares. El usuario podría enviar un mensaje de "que desea que un elemento de trabajo". Trabajadores amenable debe responder con un mensaje va realizar es". El usuario, a continuación, tiene que finalizar copia con un mensaje "hace el trabajo" para el trabajo ganador. También (debido a la naturaleza asincrónica de varios equipos del mismo nivel de interacción), atención debe concederse a todos los posibles condiciones de carrera que surgirían.

En segundo lugar, que tiene que definir qué medio "listo y pueden" para un trabajo. ¿Si todos los trabajadores están ocupados, no responden? ¿Qué es un usuario hacer si no se recibe ninguna respuesta?

En lugar de este método anuncio de trabajo, me elegir reutilizar la información ya proporcionada por los latidos de trabajo. Esta información proporciona me conocimientos suficientes para calcular el trabajo debe recibir un elementos de trabajo recién creado. Por lo tanto, el algoritmo para elegir el trabajo ganador no reside en código el trabajo y puede modificarse independientemente desde el trabajo. Por motivos de simplicidad, voy a elegir el trabajo para enviar que un elementos de trabajo basado en los miembros WorkerState, CurrentWorkItem y PendingWorkItemCount de los objetos WorkerStatus. Este es el código detrás del botón de trabajo crear:

private void btnNewWork_Click(object sender, EventArgs e) {
  WorkItem wrkItem = new WorkItem();
  wrkItem.WorkItemId = Guid.NewGuid();
  wrkItem.PretendToWorkSeconds = _rnd.Next(15);

  string nextWorker = GetNextWorker();
  _clientChannel.DoWork(nextWorker, wrkItem);

  lock (_workers)
    _workers[nextWorker].PendingWorkItemCount++;
}

Un elementos de trabajo nueva, identificado de forma exclusiva se genera con un tiempo seleccionado aleatoriamente entre 0 y 15 segundos (_rnd es un campo de nivel de clase de tipo aleatorio). SE decide el siguiente trabajo para enviar el trabajo que, a continuación, enviar el trabajo. También manualmente incrementará la versión en caché de PendingWorkItemCount el trabajo. Esto ayuda a evitar situaciones donde los trabajadores de dos o más tienen la misma estadística. Si lo hacen, es posible generar trabajo más rápidamente que trabajadores envían los latidos de estado. Por lo que le ajustar Mis estadísticas de trabajo almacenado en caché y enviar no carga muchos elementos de trabajo para el mismo trabajo.

La decisión para seleccionar el siguiente trabajo recibe trabajo es el propósito del método GetNextWorker que se muestra aquí:

private WorkerStatus GetNextWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);

  return workers
    .Where<WorkerStatus>(
      w => w.State != WorkerState.Offline
      && w.State != WorkerState.Unknown)
    .OrderBy<WorkerStatus, int>(w => w.PendingWorkItemCount)
    .FirstOrDefault<WorkerStatus>();
}

Gracias a nuevos métodos de extensión en las bibliotecas de Microsoft .NET Framework 3.5, eliminando trabajadores en los estados sin conexión y desconocidos y ordenar por sus conteos de elemento de trabajo pendiente es fácil. Una instrucción única (aunque wordy) puede hacer ambas cosas. También devuelven el primer elemento de la enumeración. Uso del método FirstOrDefault, se garantiza que la llamada se realiza correctamente incluso si no los trabajadores están disponibles.

Ahora vamos a ejecutar otro experimento. Iniciar la aplicación del mismo nivel de usuario y comience a dos aplicaciones de trabajo independientes. Después de unos segundos, debería ver los trabajadores de dos aparecen en la lista de aplicaciones de usuario. Haga clic ahora en el botón de trabajo crear varias veces. Si todo funciona según lo previsto, puede ver que el mismo nivel de usuario asignó los elementos de trabajo para trabajadores de ambos. Debe hecho esto de forma bastante turnos. Los trabajadores seguirá procesando los elementos de trabajo hasta que termine.

Spectators

Un requisito de muchas aplicaciones es la necesidad de registro y seguimiento. Para ayudar aún más la flexibilidad de un sistema P2P, en función del modelo, voy a crear un nuevo denominado registrador de tipo de punto. Mi punto de registrador recibirá los eventos pertinentes para el sistema. Los eventos sólo que podría ser registrados actualmente son aquellos que están expuestos por la interfaz IWorker: información de estado de trabajo y la asignación de WorkItems para un trabajo determinado. Este nivel de detalle es insuficiente. Hay muchos eventos adicionales que podría ser iniciados y realiza un seguimiento.

Aunque puede agregar en la nueva funcionalidad a la interfaz IWorker, los mensajes enviados de seguimiento son inconsequential a aplicaciones de trabajo y usuarios. Por lo tanto, voy a crear un nuevo contrato de servicio denominado ISpectator. (Me usar el término genérico Spectator en lugar de registrador o Reporter para destacar dos cosas. Primero, este tipo de punto sólo recibe las comunicaciones y no debería generar ninguno. En segundo lugar, no estoy dictar lo que hace un interlocutor con este servicio con la información recibida; es posible que sólo aparece la información en una interfaz de usuario.) Ésta es la definición de contrato de servicio:

[ServiceContract]
public interface ISpectator {
  [OperationContract(IsOneWay=true)]
  void WorkItemStateChanged(WorkItemStatus status);
}

Como una demostración, me declarar sólo un método único llamado WorkItemStateChanged. Un sistema completo tendrá probablemente muchos eventos para notificar spectators de. El método de WorkItemStateChanged toma una clase WorkItemStatus definida a continuación:

[DataContract]
public class WorkItemStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public Guid WorkItemId;

  [DataMember]
  public WorkItemState NewState;
}

Esta clase permite al remitente especificar el trabajo enviar el mensaje, este mensaje es para los elementos de trabajo y el nuevo estado de los elementos de trabajo. Estados válidos son en cola, Executing, fin y Errored. Como estoy seguro de que observado, no tiene un equivalente evento de WorkerStateChanged definido. Esto es porque el estado de trabajo se envía mediante el mensaje de latido y espero spectators para escuchar de estos. Con WCF, esto es muy fácil de hacer. Mi clase de servicio simplemente va implementar interfaces tanto IWorker y ISpectator.

La clase de servicio registrador puede verse en la figura 5 . No hay mucha diferencia estructural entre esta clase y el trabajo. En el método IWorker.Heartbeat, el registrador tiene la típica de caché de datos de estado de trabajo. Además, si éste es el primer estado visto para un trabajo o si ha cambiado el estado de un trabajo, un mensaje se escribe en salida estándar. Del mismo modo, el método de ISpectator.WorkItemStateChanged escribe un mensaje de salida estándar. Un no es probable que de producción del sistema se algo mucho más interesante, como escribir en una base de datos el registro para informes personalizados.

La figura 5 registrador

[ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)]
public class Logger : IWorker, ISpectator {
  private ServiceHost _host;
  private Dictionary<string, WorkerStatus> _workers;

  public Logger() {
    _workers = new Dictionary<string, WorkerStatus>();
  }

  public void Start() {
    _host = new ServiceHost(this);
    _host.Open();
  }

  public void Stop() {
    _host.Close();
  }

  void IWorker.Heartbeat(WorkerStatus status) {
    bool changed = false;

    lock (_workers) { 
      if (_workers.ContainsKey(status.WorkerId)) {
        WorkerStatus s = _workers[status.WorkerId];

        if (s.State != status.State)
          changed = true;
      }
      else
        changed = true;

      _workers[status.WorkerId] = status;
    }

    if (changed)
      LogWorkerStatusChange(status);
  }

  void IWorker.DoWork(string workerId, WorkItem work) {
    //Ignored. This can be assumed by the
    //WorkItemState.Queued status event.
  }

  void ISpectator.WorkItemStateChanged(WorkItemStatus status) {
    Console.WriteLine(
      "Worker: {0}| WorkItem: {1}| State: {2}", 
      status.WorkerId, 
      status.WorkItemId, 
      status.NewState);
  }

  private void LogWorkerStatusChange(WorkerStatus status) {
    Console.WriteLine(
      "Worker {0}| State: {1}", 
      status.WorkerId, 
      status.State);
  }
}

El último elemento de acción de implementar el registrador es establecer la configuración. la figura 6 muestra la configuración de este servicio WCF. Diferencias apreciables son la falta de una sección de cliente y la adición de un extremo de servicio. Puesto que un spectator lógica es sólo un consumidor de información, no hay ninguna razón para que actúe como un cliente. Agregar un extremo al servicio permite nos utilizar la misma instancia de objeto de servicio para IWorker y ISpectator receptores de mensajes.

Configuración de la figura 6 de registrador

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="LoggerPeerApplication.Logger">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.ISpectator"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

  </system.serviceModel>
</configuration>

Para completar la funcionalidad de ISpectator, es necesario modificar el interlocutor de trabajo para realizar las llamadas adecuadas. Esto incluye un nuevo extremo de cliente a la app.config:

<endpoint
  name="ISpectatorClient"
  address="net.p2p://MyPeerApplication"
  binding="netPeerTcpBinding"
  bindingConfiguration="unsecureBinding"
  contract="PeerWcfComm.ISpectator" />

Este extremo tiene un nombre nuevo (de forma que mi constructor ChannelFactory pueda encontrar la configuración correcta) y el tipo del contrato de servicio ISpectator nuevo. <ispectator>Para los cambios de código, simplemente necesita para crear un nuevo ChannelFactory < ISpectator > y un nuevo canal de cliente ISpectator. Cambia después el método DoWorkInternal de la clase de trabajo a la versión ligeramente menos aesthetically consiga que se muestra en la figura 7 . Esencialmente, antes y después de procesamiento de elemento de trabajo (suspendido el subproceso actual) voy a llamar al método WorkItemStateChanged en el canal ISpectator, lo que el estado adecuado.

Seguimiento de estado de trabajo en la figura 7

this._state = WorkerState.Busy;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() { 
    WorkerId = _id, 
    WorkItemId = wrk.WorkItemId, 
    NewState = WorkItemState.Executing 
  });

Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));

this._state = WorkerState.Ready;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() {
    WorkerId = _id,
    WorkItemId = wrk.WorkItemId,
    NewState = WorkItemState.Finished
  });

Compartir de trabajo y el equilibrio de la carga en la

Es el momento para probar otro experimentar. De nuevo, crear dos trabajadores y un único usuario. Tiene el usuario generar una gran cantidad de elementos de trabajo para los trabajadores. Ahora, mientras los trabajadores de dos están trabajando fuera, iniciar una tercera instancia de trabajo. De esto, verá un comportamiento no deseado de mi sistema: un trabajo está inactivo, mientras que otros usuarios tienen mucho trabajo por hacer.

En una aplicación P2P, nuevos compañeros pueden proceder en línea en cualquier momento para participar en la malla. En un escenario de conjunto de servidores de servidor, me gustaría nuevos compañeros para combinar automáticamente en e iniciar inmediatamente la carga de trabajo colectivo de procesamiento.

El primer cambio que está en la implementación del método IWorker.Heartbeat el trabajo. Actualmente, el código simplemente envía el estado de un trabajo a la consola. Esto debe reemplazarse con código que se almacenará en caché estados de tipo trabajadores. Hizo en los extremos de usuario y Spectator y puede principalmente cortar y pegar código en la clase de trabajo.

Ahora que el interlocutor de trabajo puede realizar un seguimiento de otros trabajadores, puede agregar código para comprobar condiciones donde un trabajo debe compartir elementos de trabajo (delantero) para trabajadores de la inactividad. Ha modificado el final del MonitorWork método el trabajo tal como se muestra en la figura 8 .

Figura 8 actualizar MonitorWork

if (_state != WorkerState.Offline) {
  if (_workQueue.Count > 1) {
    WorkerStatus firstIdleWorker = GetNextIdleWorker();

    if (firstIdleWorker != null) {
      WorkItem workToForward;
      lock (_workQueue)
        workToForward = _workQueue.Dequeue();

       _workerClientChannel.DoWork(
        firstIdleWorker, 
        workToForward);
    }
  }

  ThreadPool.QueueUserWorkItem(MonitorWork);
}

Básicamente, después de procesar un elemento de trabajo y antes de reprogramar propio, el método comprueba si un nodo de trabajo inactivo. (Agrega la condición para hacer esto únicamente si tiene más de un elemento en mi propia cola de trabajo. Esto es porque no tiene mucho sentido reenviar mi único elemento de trabajo restante a otro trabajo.) Si se encuentra un trabajo inactivo, dequeues un elementos de trabajo y lo envía a ese trabajo inactivo. El método GetNextIdleWorker contiene la lógica de selección:

private WorkerStatus GetNextIdleWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);
  return workers
    .Where<WorkerStatus>(
      w =>
        (w.State == WorkerState.Ready
        && w.PendingWorkItemCount == 0))
    .FirstOrDefault<WorkerStatus>();
}

Lo primero que hacer es bloquear los datos _workers almacenar en caché y obtener una instantánea de los valores de WorkerStatus en él. , A continuación, busque trabajadores en el estado listo con ningún trabajo pendiente, devuelve el primer uno que se encuentra.

Copia de seguridad de elementos de trabajo

¿Si un elementos de trabajo se mueve a un equipo y en cola, se convierte lo en de ella si ese mismo trabajo queda inactivo accidentalmente? Un bloqueo de proceso no permitirá que guardar los datos tras error. Una mitigación buena es realizar copias de copias de WorkItems de seguridad en compañeros adyacentes. Una solución de copia de seguridad completa consta de dos acciones posibles: copia de seguridad y restauración. Esto no es una tarea trivial.

La solución de copia de seguridad más simple sería volver a utilizar el método DoWork. El primer parámetro especifica el IDENTIFICADOR de trabajo en el que se supone que para realizar el trabajo. Sin embargo, dicho mensaje realmente va fuera a la malla completa. Podría aprovechar las ventajas de este hecho y tiene todos los compañeros que no coinciden con el trabajo determinado ID guardar los elementos de trabajo como una copia de seguridad. El trabajo de destino real de ese mensaje, finalmente, podría indicar los otros para eliminar esa copia de seguridad.

El equilibrio aquí es que cada trabajo está utilizando los recursos de almacenamiento en caché copias de seguridad de los elementos de trabajo. Si no hay muchos elementos de trabajo y trabajadores pueden procesarlos muy rápidamente, esto puede ser una solución viable. Sin embargo, si hay muchos elementos de trabajo en el sistema, podría obtener prohibitivo.

Una solución mejor podría reducir el número total de las copias de seguridad, pero a expensas de algunas adicional de mensajes de la malla.

Tomar una página de artículo de Antonio Couto" Cómo el estado de diseño de uso compartido en una red de mismo nivel", Pueden realizar copias de seguridad WorkItems en vecinos iguales al enviar un mensaje limitado a un número de saltos único. La solicitud de copia de seguridad y las comunicaciones siguientes pueden ser como familiares como desee. Puesto que no tiene sentidos para los usuarios o Spectators estos mensajes de copia de seguridad, no uso el contrato de servicio IWorker. Ha creado un nuevo IWorkBackup llamado uno:

[ServiceContract]
public interface IWorkBackup {
  [OperationContract(IsOneWay = true)]
  void BackupItem(WorkItemBackupRequest request);

  [OperationContract(IsOneWay = true)]
  void DeleteBackup(Guid workItemId);

  [OperationContract(IsOneWay = true)]
  void RestoringBackup(Guid workerId, Guid workItemId);
}

El primer método llaman a un trabajo deseen de elementos de un trabajo de copia de seguridad. Se llama a la segunda cuando se ha completado un elementos de trabajo y otros trabajadores pueden eliminar sus copias de él. La última vez se llama cuando un trabajo determina que una copia de elementos de trabajo de seguridad se debería restaurar y pretende hacerlo.

El mensaje enviado por BackupItem se define mediante la clase WorkItemBackupRequest:

[MessageContract]
public class WorkItemBackupRequest {
  [PeerHopCount]
  public int HopCount = 1;

  [MessageBodyMember]
  public WorkItem Item;

  [MessageBodyMember]
  public Guid OriginalWorkerId;
  public DateTime BackupRequestTimeStamp;
  public DateTime RestoreAt;
}

El miembro HopCount tiene el atributo PeerHopCount, lo que limita el recorrido de mensaje a través de la malla al número de saltos especificado. Ha codificado que el valor 1 para que este mensaje se enviarse compañeros sólo a adyacentes (un salto). También hay dos los miembros marcados con el atributo MessageBodyMember: elemento (el elemento de trabajo para crear una copia de seguridad) y OriginalWorkerId (el trabajador que solicita la copia de seguridad). Se enviará el mensaje SOAP subyacente. También he agregado dos miembros adicionales para un seguimiento interno.

Cuando un trabajo recibe el mensaje RequestBackup, necesita la caché de ese elemento. Esto se hace bastante fácil agregando un nuevo campo privado para almacenar las copias de seguridad:

private Dictionary<string, WorkItem> _workItemBackups;

Aquí es la implementación del método RequestBackup para un punto de trabajo:

void IWorkBackup.BackupItem(WorkItemBackupRequest request) {
  request.BackupRequestTimeStamp = DateTime.Now;
  request.RestoreAt = DateTime.MaxValue;

  lock (_workItemBackups)
    _workItemBackups.Add(request.Item.WorkItemId, request);
}

Este método establece la BackupRequestTimeStamp a la hora actual, y se predeterminados RestoreAt para DateTime.MaxValue. Estos campos resultar útil más adelante.

Una vez se ha completado el trabajo en el interlocutor de trabajo local, debe enviar un mensaje a todos los compañeros que posiblemente han almacenado en caché una copia de seguridad de que el elemento de trabajo. Para ello, llamando al método DeleteBackup y proporcionar el IDENTIFICADOR de los elementos de trabajo que se ha completado.

Observe que no limite este mensaje para compañeros adyacentes como hice con el mensaje BackupItem. El motivo es que puede cambiar la topología de maya del mismo nivel. Entre el momento que se ha solicitado una copia de seguridad y los elementos de trabajo se complete, compañeros de trabajo que eran una vez adyacentes no se pueden ahora. Por lo tanto, enviar un mensaje de general a todos los trabajadores para eliminar su copia de seguridad de este artículo, si tienen una. Para la finalización, aquí es el código en el interlocutor de trabajo que implementa DeleteBackup:

void IWorkBackup.DeleteBackup(Guid workItemId) {
  lock (_workItemBackups)
    if (_workItemBackups.ContainsKey(workItemId))
      _workItemBackups.Remove(workItemId);
}

Realizar copias de seguridad y quitar elementos de trabajo son esta técnica de implementación resulte muy sencilla de otros proveedores podido. Sin embargo, la dificultad true surge para determinar cuando un trabajo de copia de seguridad necesita elemento que se va a restaurar. Necesita determinar tanto cuando un elemento de trabajo necesita que se va a restaurar y que restaurarlo.

Para determinar cuándo un elemento de trabajo debe restaurarse, cada trabajo debe supervisar periódicamente su caché de copia de seguridad. Para ello, he agregado otro System.Threading.Timer denominado _backupMonitor, que obtiene establecer en el método Start para el trabajo:

_backupMonitor = new Timer(
  this.MonitorBackups,
  null,
  new TimeSpan(0),
  new TimeSpan(0, 1, 0));

Esto puede ejecutar en un período mucho más lento que el latido, por lo que establece el temporizador para desencadenar cada minuto.

El método MonitorBackups controla la lógica para restaurar copias de seguridad y pueden verse en la figura 9 . Este método recorrer el elemento de copia de seguridad y comprobar cada uno de la necesidad de restaurar. Para ello, algunos supuestos debe realizarse. En primer lugar, hay una variable local denominada timeOutThreshold. Esto es la cantidad de tiempo al que se dijo realizar copia de un elemento al que debe espera que se van a procesar. SE codifica esta opción para una minuto (aunque lo ideal es que debe ser configurable) ya sabe que los elementos de trabajo pueden durar no de 15 segundos (esto se ha definido por mí en mi aplicación de usuario). Por lo tanto, excepto en uso de recursos asignadas, debe ser gran cantidad de tiempo. (Puede ser una solución mejor para que el autor de solicitud de copia de seguridad determinar este tiempo de espera y especifique en el mensaje de solicitud. Se tiene conocimiento de su carga de trabajo y puede proporcionar una estimación mucho mejor.) Si la hora actual es mayor que el WorkItemBackupRequest.BackupRequestTimeStamp además la timeOutThreshold, voy a investigar más. Automáticamente no tenerse en cuenta este elemento de trabajo han fallado porque el trabajo original podría ser simplemente realmente ocupado. Sin embargo, si el tiempo ha caducado, pase para comprobar el estado de la trabajo original. Si es no disponible o listo con elementos de su cola de trabajo le Supongamos está ocupado simplemente y se sigan sometidos a este elemento de trabajo.

Figura 9 MonitorBackups

private void MonitorBackups(object context) {
  List<WorkItemBackupRequest> backups = 
    new List<WorkItemBackupRequest>();
  TimeSpan timeOutThreshold = 
    new TimeSpan(0, 1, 0); //This should be read in from config.

  lock (_workItemBackups) {
    backups.AddRange(_workItemBackups.Values);

    foreach (WorkItemBackupRequest backup in backups) {
      //If we've passed the RestoreAt time, we need to
      //take on this work item.
      if (backup.RestoreAt < DateTime.Now) {
        lock (_workQueue)
          _workQueue.Enqueue(backup.Item);

        _workItemBackups.Remove(backup.Item.WorkItemId);
      }

      //This work item has not been deleted yet. 
      //This is suspicious so investigate further.
      else if (backup.BackupRequestTimeStamp.Add(timeOutThreshold) >
         DateTime.Now) {

        //Check to see if the original worker is still operating.
        //Regardless, we won't do anything and the backup
        //remains in our cache.
        if (!WorkerIsStillActive(backup.OriginalWorkerId)) {
          //The machine doesn't seem to be actively working.
          //Set a deadline for restoration of the work item.
          backup.RestoreAt = DateTime.Now.Add(timeOutThreshold);

          //Tell the world our intentions to restore this.
          _backupClientChannel.RestoringBackup(
            this._id, backup.Item.WorkItemId);

          Console.WriteLine("Planning to restore '{0}'", 
            backup.Item.WorkItemId);
        }
      }
    }
  }
}

private bool WorkerIsStillActive(Guid workerId) {
  bool active = false;

  lock (_workers) {
    if (_workers.ContainsKey(workerId)) {
      WorkerStatus sts = _workers[workerId];
      if (sts.State == WorkerState.Busy)
        active = true;
      else if (sts.State == WorkerState.Ready && 
        sts.PendingWorkItemCount > 0)
        active = true;
    }
  }

  return active;
}

Si se llegar a este punto y vea que tanto el tiempo de espera supuesto ha caducado y que el trabajo original es no responde, comenzaré el proceso de restauración de ese elemento de trabajo. Para ello, llamar al método IWorkBackup.RestoringBackup para anunciar mi intención a todos los otros compañeros. También establezca el campo de RestoreAt del objeto WorkItemBackupRequest a una hora futura. Aunque estoy reutilizar el mismo valor TimeSpan, este momento futuro es lógicamente diferente (y lo ideal es que debe tener un punto único de configuración). RestoreAt es el momento en el que se supone que todos los demás compañeros han recibido el mensaje y han tenido tiempo para responder, si lo desea. Obtener más información sobre esta lógica más adelante. Pero por ahora, sabe que establezca RestoreAt el tiempo cuando moverá realmente los elementos de trabajo de la caché copia de seguridad a la cola de trabajo. La próxima vez que el temporizador MonitorBackups se desencadena se comprueba si este valor y actuar correctamente.

Esta lógica determina cuando se produce una restauración. Es la parte más sencilla para determinar. La más complicada es que debe restaurar un elemento de trabajo. PeerChannel intenta mantener una malla óptimo y la topología puede cambiar a medida iguales unirse y dejan. Como tal, puede tener un único punto de dos a siete nodos adyacentes en cualquier momento. Si se envió una solicitud de copia de seguridad de un salto a todas ellas, podría tengo trabajadores varios vying todo para restaurar el mismo trabajo, dado que todos ellos tienen la misma lógica determinar cuando deben restaurarse un elementos de trabajo. ¿Por lo que qué sucede cuando intente todos simultáneamente y restaurar? Para restaurar, los extremos en primer lugar deben anunciar su intención mediante una llamada al método IWorkBackup.RestoringBackup (consulte la figura 10 ).

Figura 10 IWorkBackup.RestoringBackup

void IWorkBackup.RestoringBackup(Guid workerId, Guid workItemId) {
  lock (_workItemBackups) {
    //Only proceed if we have this item backed up.
    if (_workItemBackups.ContainsKey(workItemId)) {
      //Get the backup request for this work item
      WorkItemBackupRequest req = _workItemBackups[workItemId];

      //Check to see if we've decided to restore this.
      if (req.RestoreAt != DateTime.MaxValue) {
        //The restorer with the greatest id wins!
        if (this._id.CompareTo(workItemId) < 0)
          _workItemBackups.Remove(workItemId);
      }
      else {
        //We haven't decided to restore it and 
        //someone else wants to. So let them.
        _workItemBackups.Remove(workItemId);
      }
    }
  }
}

En este momento, existen tres situaciones como se ve desde el punto de vista de un trabajo que sólo ha recibido un mensaje RestoringBackup desde otro trabajo:

  1. El trabajo está restaurando un elemento no ya almacenado en caché.
  2. El trabajo está restaurando un elemento en la memoria caché pero aún no desea restaurar.
  3. El trabajo está restaurando un elemento que también me intending para restaurar.

Escenario uno es simple ya no es necesaria ninguna acción. Escenario dos es también sencilla que únicamente puede quitar el elemento copia de seguridad de nuestro caché. Aún no ha determinado que necesita restaurar, pero alguien tiene por lo que pueden tener.

Es escenario tres que requiere desempate. Cada equipo del mismo nivel está actuando autonomously y podría anunciar al mismo tiempo que va a restaurar un elemento. Los mensajes podría pasar entre sí en el cable y, por tanto, más de un trabajo pudo encontrar propio en escenario tres.

¿La cuestión mágica es: que gana? ¿Que restaura y que pasa por alto? La decisión debe realizarse en algo que no es relativa entre iguales (como la hora de un sistema) y no cambia. Decidió que una corbata va el más alto valiosos ID de trabajo. Una llamada impartial Guid.CompareTo determina el ganador. Teniendo esto en su lugar, el ganador realiza la restauración y omite el perdedor.

Del mismo nivel divertida

Admitir. . . Redes P2P es divertido. Pero esto no perder el hecho de que hay escenarios del mundo real que su naturaleza descentralizada hace un buen ajuste para las aplicaciones. Además, estas aplicaciones no están limitadas a charla o medios pirating! Las aplicaciones de habilitación del mismo nivel que requieren procesamiento masivo es un ajuste natural, si un conjunto de aplicaciones personalizadas o un escenario de informática de alto rendimiento (HPC). Facilita la coordinación de trabajo y el procesamiento a gran escala relativamente fácil.

El diseño de WCF facilita trivial para implementar un servicio y evolucionan, según sea necesario. Los datos compartidos y comunicación de una malla P2P permite a observar y participar en lo que sucede en los clientes diferentes. En este artículo se ofrece una demostración de creación de una plataforma de procesamiento habilitado el punto donde varios jugadores (trabajadores, los usuarios y Spectators) puedan todo funcionando conjuntamente para un propósito común: obtener su trabajo.

Matt Neely es ingeniero de diseño de software de Microsoft en el equipo de herramientas de aprendizaje del usuario de SQL Server. Emplea la mayoría de su tiempo convencer su amigo Caleb se le mencionado en un próximo artículo y shaving su perro (goldendoodle) por su propia con resultados astonishingly horrific. Puede ponerse en mneely@Microsoft.com.