June 2009

Volume 24 Number 06

Peer Fun - A Peer-To-Peer Work Processing App With WCF

By Matt Neely | June 2009

This article discusses:

  • Basic peer-to-peer communication
  • Creating the Worker peer
  • Load balancing and work sharing
  • Backing up and restoring work items
This article uses the following technologies:
WCF

Contents

Basic Peer Communication
The Worker Peer
Making the Worker Work
The User Peer
Spectators
Load Balancing and Work Sharing
WorkItem Backup
Peer Fun

The group I'm a part of atMicrosoft lets us take a week off from our regular duties to research a topic. I recently elected to spend my week researching peer-to-peer (P2P) networking. That research ballooned into a permanent interest in this new networking paradigm.

One of the reasons P2P piqued my interest was due to an internal documentation build system my team owns. We have authors writing thousands of help documents that need to be parsed, transformed, patched, and compiled. The documents can be compiled into various output formats and in various languages every night. To accomplish this feat, we have a server farm running our build software.

The mechanism we currently use is your typical client-server architecture. Build jobs are queued on a central server and doled out as build servers become free to process them. We occasionally encounter a backlog of jobs on the central server or a misbehaving build server. Many team members have a vested interest in the status of the build system, but the system is opaque and difficult to monitor.

This article is the result of envisioning a P2P version of this build system. I will discuss the emerging design of a generic work processing and peer-enabled application based on the Windows Communication Foundation (WCF) PeerChannel. I'll lead you through the construction of this system while discussing some of the design decisions that must be made in the process.

Basic Peer Communication

For this project, I'll start by defining my main work-processing peers. I'll give them the generic name of Workers. The plan will be to have one Worker per physical machine (although there are no technical limitations requiring this).

Each Worker will need to communicate and coordinate with others to accomplish work. In order to do this, it needs to have some knowledge about other Workers. So how will my Worker get this information?

One approach is to have each Worker poll every other Worker for its state. This mechanism is complex as it requires two phases: Worker discovery and Worker querying. In addition, if each Worker were to ping every other Worker, you'd have many unneeded messages traveling your network. Coupled with the fact that the WCF PeerChannel is inherently multicast, pinging a single Worker will really send a message to every Worker. For n Worker peers, I'd have n3 messages flying around. I declined this approach for a broadcast model.

In a broadcast model, every Worker will periodically send out its status. I call this a heartbeat message. Other Workers can decide what to do with this information, even if they just ignore it. I prefer this approach because as the Worker mesh topology changes, you can infer nodes coming online and offline by the presence or absence of heartbeat messages. Worker peer discovery is passive instead of active. This model also minimizes the network traffic needed just for system upkeep. With n Worker peers I'd have n2 messages in this model, which is an improvement over the first approach.

To create a service in WCF, you usually start with defining your service contract. This is the interface that defines the functionality a service exposes to clients. In a P2P scenario, each peer acts as both server and client, and the service contract created here will define how peers communicate with each other. Given the decision to broadcast Worker status, I can define my service contract:

[ServiceContract] public interface IWorkerPeer { [OperationContract(IsOneWay = true)] void Heartbeat(WorkerStatus status); }

My IWorkerPeer service contract has a single, one-way operation called Heartbeat. When a Worker calls this method, it will send out the pre-populated WorkerStatus object instance to every other Worker peer participating in my mesh.

Here's the simple definition of the WorkerStatus class:

[DataContract] public class WorkerStatus { [DataMember] public Guid WorkerId; [DataMember] public WorkerState State; }

It will contain a unique identifier for the Worker and a simple state enumeration value. In a more functional system, this could be an elaborate set of data incorporating extra descriptors, performance counters, Worker statistics (such as percent of failed work items), and more. As defined, though, this will minimally allow each Worker to know the status of every other Worker in the mesh.

The states I've defined are Unknown, Ready, Busy, and Offline. A state of Unknown means that I'm not sure of the Worker's state (perhaps due to the absence of a heartbeat from that Worker within a specified period of time). A Ready state means the Worker is functioning correctly and is ready to take on some work. A Busy state means the Worker is functioning correctly, but busy doing work. An Offline state means that the Worker is not available to do work. This could potentially be the state sent out from a Worker's final heartbeat or perhaps a Worker that is taken offline by an administrator for maintenance or debugging.

The Worker Peer

To make a functional Worker peer application, I need to expose and consume my service. So I created a simple console application and gave it a class named Worker as seen in Figure 1. In my Worker class, I added Start and Stop methods to handle the setup and teardown of the exposed service (which receives messages) as well as the client channel (for sending messages). I also use a System.Threading.Timer instance to periodically call the method SendHeartbeat. SendHeartbeat instantiates a WorkerStatus object, populates it with the appropriate data, and then sends it to all peers. As a service, when the IWorker.Heartbeat method gets called, it will simply output the message content to standard output.

Figure 1 The Worker Class

namespace WorkerPeerApplication { [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] public class Worker: IWorker { private ServiceHost _host; private ChannelFactory<IWorker> _factory; private IWorker _workerClientChannel; private Guid _id; private volatile WorkerState _state; private Timer _heartbeat; public Worker() { _id = Guid.NewGuid(); _state = WorkerState.Offline; } public void Start() { _host = new ServiceHost(this); _host.Open(); _factory = new ChannelFactory<IWorker> ("IWorkerClient"); _workerClientChannel = _factory.CreateChannel(); _state = WorkerState.Ready; _heartbeat = new Timer(this.SendHeartbeat, null, new TimeSpan(0), //Time should really be read in from config. new TimeSpan(0, 0, 1)); } public void Stop() { _state = WorkerState.Offline; _heartbeat.Dispose(); this.SendHeartbeat(null); _factory.Close(); _host.Close(); } private void SendHeartbeat(object context) { WorkerStatus ws = new WorkerStatus(); ws.WorkerId = _id; ws.State = _state; _workerClientChannel.Heartbeat(ws); } void IWorker.Heartbeat(WorkerStatus status) { Console.WriteLine("{0}: {1}", status.WorkerId, status.State); } } }

For completion, I need a Main method and the appropriate WCF configuration, which I put in the application configuration file. The Main method can be seen in the code download, and the configuration I use is shown in Figure 2.

Figure 2 Worker Configuration

<?xml version="1.0" encoding="utf-8" ?> <configuration> <system.serviceModel> <services> <service name="WorkerPeerApplication.Worker"> <host> <baseAddresses> <add baseAddress = "net.p2p://MyPeerApplication" /> </baseAddresses> </host> <endpoint address ="" binding="netPeerTcpBinding" bindingConfiguration="unsecureBinding" contract="PeerWcfComm.IWorker"/> </service> </services> <bindings> <netPeerTcpBinding> <binding name="unsecureBinding"> <security mode="None"/> <resolver mode="Pnrp"/> </binding> </netPeerTcpBinding> </bindings> <client> <endpoint name="IWorkerClient" address="net.p2p://MyPeerApplication" binding="netPeerTcpBinding" bindingConfiguration="unsecureBinding" contract="PeerWcfComm.IWorker" /> </client> </system.serviceModel> </configuration>

You can see that I define both the service the peer exposes and the peer's client-side configuration. The service configuration specifies the type of the service as well as its base address and endpoint. I'm using an unsecure binding and the Peer Naming Resolution Protocol (PNRP) for resolving peer names. The client configuration is almost a mirror of the service. The only noticeable difference is that I gave the client endpoint a name so that the ChannelFactory constructor can easily access the desired configuration.

Now that I have the shell of a Worker peer, let's test drive it. For an experiment, run two instances of this program at the same time. You'll see that at first each peer will output its own heartbeat information. After a few seconds, peers will begin to find each other and each Worker will receive status from the others.

Frankly, I consider it annoying and distracting for my own messages to be sent back to me. Thankfully, the PeerChannel team blogprovides the code I need. You'll see a very useful class called RemoteOnlyPropagationFilter. Borrowing this, I can then add the following two lines of code to my Worker.Start method to ensure that the messages I send out don't also end up being sent to me:

PeerNode pn = ((IClientChannel)_clientChannel).GetProperty<PeerNode>(); pn.MessagePropagationFilter = new RemoteOnlyMessagePropagationFilter();

These two lines of code were added before I start the heartbeat timer.

Making the Worker Work

So far, I have an application that merely communicates among Worker peers. Workers don't actually do any work yet. I'll remedy this by augmenting the IWorker interface, adding the following method:

[OperationContract(IsOneWay = true)] void DoWork(Guid workerId, WorkItem work);

This method allows an external entity to tell a Worker peer to do an item of work. Since the WCF messaging is multicast, I added a parameter to specify the ID of the Worker that is to perform the work. I'll discuss the WorkItem class in a moment. But for now, the interface change requires me to update the Worker class as seen in Figure 3. When someone tells a Worker to do some work, this request is added to an internal queue and eventually executed in the order received. To actually perform the work, I added a method called MonitorWork to the Worker class. The Start method will simply call ThreadPool.QueueUserWorkItem, pointing it to the MonitorWork method.

Figure 3 Revised Worker

public class Worker: IWorker { // Omitted for clarity ... private Queue<WorkItem> _workQueue; public Worker() { // Omitted for clarity ... _workQueue = new Queue < WorkItem > (); } public void Start() { // Omitted for clarity ... ThreadPool.QueueUserWorkItem(MonitorWork); } private void MonitorWork(object context) { if (_workQueue.Count > 0) { lock(_workQueue) _currentWorkItem = _workQueue.Dequeue(); DoWorkInternal(_currentWorkItem); _currentWorkItem = null; } if (_state != WorkerState.Offline) ThreadPool.QueueUserWorkItem(MonitorWork); } private void DoWorkInternal(WorkItem wrk) {... } void IWorker.DoWork(Guid workerId, WorkItem work) { if (workerId == _id) lock(_workQueue) _workQueue.Enqueue(work); } }

At a high level, MonitorWork will do something and then reschedule itself via another call to ThreadPool.QueueUserWorkItem. It will do this until the state of the Worker is Offline, which happens when Stop is called on the Worker object. The something it does is to check whether the work queue has anything in it. If the queue contains work items, MonitorWork will dequeue the next WorkItem and begin processing it. The behavior I want is for my Worker to serially process its work, although it could easily be modified to process items in parallel.

As you see from the code in Figure 3, I have not addressed what work is yet. I've merely passed this to the yet-to-be-defined DoWorkInternal method. So let's first look at the WorkItem class definition. As the system that I'm building for this article is task-agnostic, I'm going to simply define my work item as follows:

[DataContract] public class WorkItem { [DataMember] public Guid WorkItemId { get; set; }[DataMember] public int PretendToWorkSeconds { get; set; } }

I must again use a DataContract or MessageContract attribute so that WCF can correctly serialize my custom class. The WorkItem class has an identifier field, WorkItemId, and a field specifying the number of seconds to feign doing actual work (named PretendToWorkSeconds).

Now that a WorkItem is defined, I can process it. Here is my DoWorkInternal method implementation:

private void DoWorkInternal(WorkItem wrk) { this._state = WorkerState.Busy; Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds)); this._state = WorkerState.Ready; }

This is where I would normally do my meaningful work (for example, building the SQL Server 2008 setup documentation in Brazilian Portuguese). But for the sake of this article, I'm just going to sleep the specified time period.

Finally, I update my WorkStatus class with two new data members that will be set before a Worker's heartbeat is sent. They are the identifier of the work item currently being executed and a count of the pending work items on that Worker peer:

[DataContract] public class WorkerStatus { // Omitted for clarity ... [DataMember] public Guid CurrentWorkItem; [DataMember] public int PendingWorkItemCount; }

Now that my Worker peers have some data, I have to be acutely aware of scenarios that bring about data loss. One of the tradeoffs of a decentralized networking paradigm like P2P is that there is no central repository for data. Each peer is responsible for collectively preserving the system's data. So what happens if a Worker peer goes down? Specifically for my system, what happens to the queued WorkItems?

I group peer shutdown scenarios into two categories: expected and unexpected. For shutdowns of an expected nature, I need to deal with the queued WorkItems. The paradigm for handling these items can vary depending on the nature of the application and data. If the data isn't important, I might be able to simply discard all the queued WorkItems. If the application is designed to be paused (or to restart automatically like a Windows service), perhaps serializing the items to disk and reading them upon startup is the correct solution. Yet another option is to simply forward the surplus WorkItems to other Workers prior to shutdown. I'll discuss unexpected shutdown later in this article in the context of data backup.

The User Peer

Thus far, I have a peer-enabled application that can accept and process defined work items. But who creates these work items and sends them to the Worker peers? One of the powerful features of a P2P system is that anyone can hook into the same communication infrastructure. All that is needed to participate (for unsecure meshes) is knowledge of the mesh address and the service contract. I'll demonstrate this by adding a new peer type called a User peer.

The class for the User peer is similar to the Worker peer discussed earlier. The User peer will also implement the IWorker interface, but doesn't need to provide an implementation of the DoWork method. (Remember to think of ServiceContract interfaces as a communication contract and not as an identity declaration. A User is not a Worker, but it participates in the IWorker peer mesh and thus implements that interface.)

The User peer is interested in tracking Workers as well as creating new work items and giving them to Workers. Figure 4shows a screenshot of the User application. This application lists out the information received from Worker status messages. However, the functionality I want to focus on is the creation of WorkItems and then assigning them to Workers.

Figure 4 The User Peer Application

A design decision needs to be made regarding how WorkItems are assigned to Workers. One method would be to rely on the multicast nature of PeerChannel and broadcast a message to all Workers saying that a new WorkItem was available. The first Worker to respond that it is ready and able to take it wins the right to have the new WorkItem.

I don't like this approach for several reasons. First, the communication protocol would be quite chatty. The User would send out a "Who Wants a Work Item" message. Amenable Workers would respond with an "I'll Take It" message. The User would then have to finish up with a "Do the Work" message to the winning Worker. Also (due to the asynchronous nature of multiple, interacting peers), careful attention must be given to all potential race conditions that would arise.

Second, I'd have to define what "ready and able" means for a Worker. If all Workers are busy, do they not respond? What does a User do if no response is received?

Instead of this work-advertisement method, I'm choosing to reuse the information already provided from Worker heartbeats. This information gives me sufficient knowledge to compute which Worker should receive a newly created WorkItem. Thus, the algorithm for choosing the winning Worker doesn't reside in the Worker's code and can be altered independently from the Worker. For simplicity, I'm going to choose the Worker to send a WorkItem based upon the WorkerState, CurrentWorkItem, and PendingWorkItemCount members of the WorkerStatus objects. Here's the code behind the Create Work button:

private void btnNewWork_Click(object sender, EventArgs e) { WorkItem wrkItem = new WorkItem(); wrkItem.WorkItemId = Guid.NewGuid(); wrkItem.PretendToWorkSeconds = _rnd.Next(15); string nextWorker = GetNextWorker(); _clientChannel.DoWork(nextWorker, wrkItem); lock(_workers) _workers[nextWorker].PendingWorkItemCount++; }

A new, uniquely identified WorkItem is generated with a randomly selected time between 0 and 15 seconds (_rnd is a class-level field of type Random). I decide the next worker to send the work to, then send the work. I also manually increment my cached version of the Worker's PendingWorkItemCount. This helps avoid situations where two or more Workers have the same stats. If they do, it is possible to generate work faster than Workers send out status heartbeats. So I'll adjust my cached Worker stats and not unfairly send lots of work items to the same Worker.

The decision for selecting the next Worker to receive work is the purpose of the GetNextWorker method shown here:

private WorkerStatus GetNextWorker() { List<WorkerStatus> workers = new List<WorkerStatus> (); lock(_workers) workers.AddRange(_workers.Values); return workers.Where <WorkerStatus>(w => w.State != WorkerState.Offline && w.State != WorkerState.Unknown).OrderBy< WorkerStatus, int>(w => w.PendingWorkItemCount).FirstOrDefault<WorkerStatus> (); }

Thanks to new extension methods in the Microsoft .NET Framework 3.5 libraries, eliminating Workers in the offline and unknown states and ordering by their pending work item counts is easy. A single (though wordy) statement can do both. I also return the first item in the enumeration. Using the FirstOrDefault method ensures the call succeeds even if no workers are available.

Now let's run another experiment. Start the User peer application and then start two separate Worker apps. After a few seconds, you should see the two Workers appear in the User application list. Now click the Create Work button several times. If everything works as expected, you can see that the User peer gave work items to both Workers. It should have done this in a fairly round-robin fashion. The Workers will continue processing their work items until finished.

Spectators

A requirement of many applications is the need for logging and tracing. To help further showcase the flexibility of a P2P-based system, I'm going to create a new peer type called Logger. My Logger peer will receive events pertinent to the system. The only events that could be currently tracked are those that are exposed by the IWorker interface: Worker status information and the assignment of WorkItems to a particular Worker. This level of detail is insufficient. There are many additional events that could be logged and tracked.

Although I could tack on new functionality to the IWorker interface, the messages sent for tracking are inconsequential to both Worker and User applications. Therefore, I'm going to create a new service contract called ISpectator. (I'm using the generic term Spectator instead of Logger or Reporter to emphasize two things. First, this peer type only receives communications and shouldn't generate any. Second, I am not dictating what a peer using this service does with the information received; it might only show the information in a UI.) Here's the service contract definition:

[ServiceContract] public interface ISpectator { [OperationContract(IsOneWay=true)] void WorkItemStateChanged(WorkItemStatus status); }

As a demonstration, I'm declaring just a single method called WorkItemStateChanged. A full-featured system will likely have many events to notify spectators of. The WorkItemStateChanged method takes a WorkItemStatus class defined here:

[DataContract] public class WorkItemStatus { [DataMember] public Guid WorkerId; [DataMember] public Guid WorkItemId; [DataMember] public WorkItemState NewState; }

This class allows the sender to specify the Worker sending the message, the WorkItem this message is for, and the new state of the WorkItem. Valid states are Queued, Executing, Finished, and Errored. As I'm sure you noticed, I don't have an equivalent WorkerStateChanged event defined. That's because Worker status is sent via the heartbeat message and I expect spectators to listen for these. With WCF, this is terribly easy to do. My service class will simply implement both IWorker and ISpectator interfaces.

The Logger service class can be seen in Figure 5. There isn't much structural difference between this class and the Worker. In the IWorker.Heartbeat method, the Logger has the typical caching of Worker status data. Additionally, if this is the first status seen for a Worker or if the state has changed for a Worker, then a message is written to standard output. Likewise, the ISpectator.WorkItemStateChanged method writes a message to standard output. A production system will likely do something much more interesting, like writing to a logging database for custom reports.

Figure 5 Logger

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] public class Logger: IWorker, ISpectator { private ServiceHost _host; private Dictionary <string, WorkerStatus> _workers; public Logger() { _workers = new Dictionary <string, WorkerStatus>(); } public void Start() { _host = new ServiceHost(this); _host.Open(); } public void Stop() { _host.Close(); } void IWorker.Heartbeat(WorkerStatus status) { bool changed = false; lock(_workers) { if (_workers.ContainsKey(status.WorkerId)) { WorkerStatus s = _workers[status.WorkerId]; if (s.State != status.State) changed = true; } else changed = true; _workers[status.WorkerId] = status; } if (changed) LogWorkerStatusChange(status); } void IWorker.DoWork(string workerId, WorkItem work) { //Ignored. This can be assumed by the //WorkItemState.Queued status event. } void ISpectator.WorkItemStateChanged(WorkItemStatus status) { Console.WriteLine("Worker: {0}| WorkItem: {1}| State: {2}", status.WorkerId, status.WorkItemId, status.NewState); } private void LogWorkerStatusChange(WorkerStatus status) { Console.WriteLine("Worker {0}| State: {1}", status.WorkerId, status.State); } }

The last action item in implementing the Logger is to set up configuration. Figure 6shows the configuration for this WCF service. Noticeable differences are the lack of a client section and the addition of a service endpoint. Since a spectator is logically only a consumer of information, there is no reason to act like a client. Adding an endpoint to the service allows us to use the same service object instance for IWorker and ISpectator message sinks.

Figure 6 Configuration for Logger

<?xml version="1.0" encoding="utf-8" ?> <configuration> <system.serviceModel> <services> <service name="LoggerPeerApplication.Logger"> <host> <baseAddresses> <add baseAddress = "net.p2p://MyPeerApplication" /> </baseAddresses> </host> <endpoint address ="" binding="netPeerTcpBinding" bindingConfiguration="unsecureBinding" contract="PeerWcfComm.IWorker"/> <endpoint address ="" binding="netPeerTcpBinding" bindingConfiguration="unsecureBinding" contract="PeerWcfComm.ISpectator"/> </service> </services> <bindings> <netPeerTcpBinding> <binding name="unsecureBinding"> <security mode="None"/> <resolver mode="Pnrp"/> </binding> </netPeerTcpBinding> </bindings> </system.serviceModel> </configuration>

To complete the ISpectator functionality, I need to alter the Worker peer to make the appropriate calls. This includes a new client endpoint to the app.config:

<endpoint name="ISpectatorClient" address="net.p2p://MyPeerApplication" binding="netPeerTcpBinding" bindingConfiguration="unsecureBinding" contract="PeerWcfComm.ISpectator" />

This endpoint has a new name (so that my ChannelFactory constructor can find the correct settings) and the type of the new ISpectator service contract. For the code changes, I simply need to create a new ChannelFactory<ISpectator> and a new ISpectator client channel. I then changed the DoWorkInternal method on the Worker class to the slightly less aesthetically pleasing version shown in Figure 7. Essentially, before and after work item processing (sleeping the current thread) I'm going to call the WorkItemStateChanged method on the ISpectator channel, giving the appropriate state.

Figure 7 Work Status Tracking

this._state = WorkerState.Busy; _spectatorClientChannel.WorkItemStateChanged(new WorkItemStatus() { WorkerId = _id, WorkItemId = wrk.WorkItemId, NewState = WorkItemState.Executing }); Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds)); this._state = WorkerState.Ready; _spectatorClientChannel.WorkItemStateChanged(new WorkItemStatus() { WorkerId = _id, WorkItemId = wrk.WorkItemId, NewState = WorkItemState.Finished });

Load Balancing and Work Sharing

It's time to try another experiment. Again, create two Workers and a single User. Have the User generate a lot of work items for the Workers. Now, while the two Workers are working away, launch a third Worker instance. From this, you'll see an undesired behavior of my system: one Worker is idle while others have much work to do.

In a P2P application, new peers can come online at any time to participate in the mesh. In a server farm scenario, I want new peers to automatically join in and immediately start processing the collective workload.

The first change to make is in the Worker's IWorker.Heartbeat method implementation. Currently, the code simply outputs a Worker's state to the console. This needs to be replaced with code that will cache fellow Workers' states. I did this in the User and Spectator peers and can mostly cut and paste that code into the Worker class.

Now that the Worker peer can keep track of other Workers, I can add code to check for conditions where a Worker should share (forward) work items to idle Workers. I've altered the end of the Worker's MonitorWork method as shown in Figure 8.

Figure 8 Updated MonitorWork

if (_state != WorkerState.Offline) { if (_workQueue.Count > 1) { WorkerStatus firstIdleWorker = GetNextIdleWorker(); if (firstIdleWorker != null) { WorkItem workToForward; lock(_workQueue) workToForward = _workQueue.Dequeue(); _workerClientChannel.DoWork(firstIdleWorker, workToForward); } } ThreadPool.QueueUserWorkItem(MonitorWork); }

Basically, after processing a work item and before rescheduling itself, the method checks for an idle Worker node. (I added the condition to do this only if I have more than one item in my own work queue. This is because it makes no sense to forward my only remaining work item to another Worker.) If an idle Worker is found, it dequeues a WorkItem and sends it to that idle Worker. The GetNextIdleWorker method contains the selection logic:

private WorkerStatus GetNextIdleWorker() { List<WorkerStatus> workers = new List<WorkerStatus>(); lock(_workers) workers.AddRange(_workers.Values); return workers.Where <WorkerStatus>( w => (w.State == WorkerState.Ready && w.PendingWorkItemCount == 0)).FirstOrDefault<WorkerStatus>(); }

The first thing I do is lock the _workers data cache and get a snapshot of the WorkerStatus values in it. I then look for Workers in the Ready state with no work pending, returning the first one found.

WorkItem Backup

If a WorkItem is moved to a machine and queued, what becomes of it if that Worker peer goes down accidentally? A process crash doesn't permit you to save data upon error. A good mitigation is to back up copies of WorkItems to adjacent peers. A complete backup solution consists of two actions: backup and restore. This isn't a trivial task.

The simplest backup solution would be to reuse the DoWork method. The first parameter specifies the ID of the Worker that is supposed to do the work. However, that message actually goes out to the entire mesh. I could take advantage of this fact and have all peers that do not match the given worker ID save the WorkItem as a backup. The actual targeted Worker of that message would eventually tell the others to delete that backup.

The tradeoff here is that every Worker is using up resources caching backups of work items. If there aren't many work items and Workers can process them pretty quickly, this might be a viable solution. However, if there are many work items in the system, it might get prohibitively expensive.

A better solution would minimize the total number of backups, but at the expense of some extra mesh messages.

Taking a page from Kevin Hoffman's article " How to Design State Sharing in a Peer Network", I can back up WorkItems to neighboring peers by sending a message limited to a single hop count. The backup request and subsequent communications can be as chatty as desired. Since these backup messages are meaningless to Users or Spectators, I won't use the IWorker service contract. I've created a new one called IWorkBackup:

[ServiceContract] public interface IWorkBackup { [OperationContract(IsOneWay = true)] void BackupItem(WorkItemBackupRequest request); [OperationContract(IsOneWay = true)] void DeleteBackup(Guid workItemId); [OperationContract(IsOneWay = true)] void RestoringBackup(Guid workerId, Guid workItemId); }

The first method is called by a Worker wishing to backup a WorkItem. The second is called when a WorkItem has been completed and other Workers can delete their copies of it. The last is called when a Worker determines that a backed up WorkItem should be restored and it intends to do so.

The message sent by BackupItem is defined by the WorkItemBackupRequest class:

[MessageContract] public class WorkItemBackupRequest { [PeerHopCount] public int HopCount = 1; [MessageBodyMember] public WorkItem Item; [MessageBodyMember] public Guid OriginalWorkerId; public DateTime BackupRequestTimeStamp; public DateTime RestoreAt; }

The HopCount member has the PeerHopCount attribute, which limits the message traversal through the mesh to the specified number of hops. I've hardcoded it to the value 1 so that this message will be sent only to adjacent (one hop) peers. There are also two members marked with the MessageBodyMember attribute: Item (the work item to be backed up) and OriginalWorkerId (the worker requesting the backup). These will be sent in the underlying SOAP message. I've also added two additional members for internal tracking.

When a Worker receives the RequestBackup message, it needs to cache that item. This is done easily enough by adding a new private field to store the backups:

private Dictionary<string, WorkItem> _workItemBackups;

Here is the implementation of the RequestBackup method for a Worker peer:

void IWorkBackup.BackupItem(WorkItemBackupRequest request) { request.BackupRequestTimeStamp = DateTime.Now; request.RestoreAt = DateTime.MaxValue; lock(_workItemBackups) _workItemBackups.Add(request.Item.WorkItemId, request); }

This method sets the BackupRequestTimeStamp to the current time, and RestoreAt is defaulted to DateTime.MaxValue. These fields prove to be useful later.

Once work has been completed on the local Worker peer, it needs to send out a message to all peers that have potentially cached a backup of the work item. It does this by calling the DeleteBackup method and providing the ID of the WorkItem that has been completed.

Notice that I don't limit this message to adjacent peers like I did with the BackupItem message. The reason is that the peer mesh topology can change. In between the time that a backup was requested and the WorkItem is completed, Worker peers that were once adjacent may not be so now. Thus, I send out a general message to all Workers to delete their backup of this item, if they have one. For completeness, here's the code in the Worker peer that implements DeleteBackup:

void IWorkBackup.DeleteBackup(Guid workItemId) { lock(_workItemBackups) if (_workItemBackups.ContainsKey(workItemId)) _workItemBackups.Remove(workItemId); }

Backing up and removing work items is a breeze! However, the true difficulty arises in determining when a backed up work item needs to be restored. I need to determine both when a work item needs to be restored and who will restore it.

To determine when a work item should be restored, each Worker must periodically monitor its backup cache. For this, I've added another System.Threading.Timer called _backupMonitor, which gets set in the Start method for the Worker:

_backupMonitor = new Timer( this.MonitorBackups, null, new TimeSpan(0), new TimeSpan(0, 1, 0));

This can execute at a much slower period than the heartbeat, so I set the timer to fire every minute.

The method MonitorBackups handles the logic for restoring backups and can be seen in Figure 9. This method will iterate through the backed-up item and check each one for the need to be restored. In order to do this, some assumptions need to be made. First, I have a local variable named timeOutThreshold. This is the amount of time from when I was told to back up an item to when I should expect it to be processed. I hardcoded this to one minute (though it should ideally be configurable) because I know that my work items can last no longer than 15 seconds (this was defined by me in my User application). Thus, except in peak usage, this should be plenty of time. (A better solution might be for the backup request originator to determine this timeout and specify it in the request message. It has knowledge of its workload and could give a much better estimate.) If the current time is greater than the WorkItemBackupRequest.BackupRequestTimeStamp plus the timeOutThreshold, I'm going to investigate further. I won't automatically consider this work item to have failed because the original worker might simply be really busy. However, if the time has expired, I move on to checking the status of the original worker. If it is either Busy or Ready with items in its work queue I'll assume it is merely busy and will still get to this work item.

Figure 9 MonitorBackups

private void MonitorBackups(object context) { List<WorkItemBackupRequest> backups = new List<WorkItemBackupRequest>(); TimeSpan timeOutThreshold = new TimeSpan(0, 1, 0); //This should be read in from config. lock(_workItemBackups) { backups.AddRange(_workItemBackups.Values); foreach(WorkItemBackupRequest backup in backups) { //If we've passed the RestoreAt time, we need to //take on this work item. if (backup.RestoreAt < DateTime.Now) { lock(_workQueue) _workQueue.Enqueue(backup.Item); _workItemBackups.Remove(backup.Item.WorkItemId); } //This work item has not been deleted yet. //This is suspicious so investigate further. else if (backup.BackupRequestTimeStamp.Add(timeOutThreshold) > DateTime.Now) { //Check to see if the original worker is still operating. //Regardless, we won't do anything and the backup //remains in our cache. if (!WorkerIsStillActive(backup.OriginalWorkerId)) { //The machine doesn't seem to be actively working. //Set a deadline for restoration of the work item. backup.RestoreAt = DateTime.Now.Add(timeOutThreshold); //Tell the world our intentions to restore this. _backupClientChannel.RestoringBackup(this._id, backup.Item.WorkItemId); Console.WriteLine("Planning to restore '{0}'", backup.Item.WorkItemId); } } } } } private bool WorkerIsStillActive(Guid workerId) { bool active = false; lock(_workers) { if (_workers.ContainsKey(workerId)) { WorkerStatus sts = _workers[workerId]; if (sts.State == WorkerState.Busy) active = true; else if (sts.State == WorkerState.Ready && sts.PendingWorkItemCount > 0) active = true; } } return active; }

If I get to this point and see that both my assumed timeout has expired and that the original worker is unresponsive, I'll start the process of restoring that work item. This is done by calling the IWorkBackup.RestoringBackup method to announce my intention to all of the other peers. I also set the RestoreAt field of the WorkItemBackupRequest object to a future time. Although I am reusing the same TimeSpan value, this future time is logically different (and should ideally have a unique configuration point). RestoreAt is the time at which I will assume that all other peers have received my message and have had time to respond, if desired. More on this logic later. But for now, know that I set RestoreAt to the time when I will actually move the WorkItem from my backup cache to my work queue. The next time the MonitorBackups timer fires it will check this value and act appropriately.

This logic determines when a restoration occurs. It is the easiest part to determine. The trickiest is who should restore a work item. PeerChannel tries to maintain an optimal mesh and the topology can change as peers join and leave. As such, a single peer can have from two to seven adjacent nodes at any time. If a one-hop backup request was sent to all of them, I'd have several Workers all vying to restore the same work since they all have the same logic determining when a WorkItem should be restored. So what happens when they all simultaneously try and restore? In order to restore, the peers must first announce their intention by calling the IWorkBackup.RestoringBackup method (see Figure 10).

Figure 10 IWorkBackup.RestoringBackup

void IWorkBackup.RestoringBackup(Guid workerId, Guid workItemId) { lock(_workItemBackups) { //Only proceed if we have this item backed up. if (_workItemBackups.ContainsKey(workItemId)) { //Get the backup request for this work item WorkItemBackupRequest req = _workItemBackups[workItemId]; //Check to see if we've decided to restore this. if (req.RestoreAt != DateTime.MaxValue) { //The restorer with the greatest id wins! if (this._id.CompareTo(workItemId) < 0) _workItemBackups.Remove(workItemId); } else { //We haven't decided to restore it and //someone else wants to. So let them. _workItemBackups.Remove(workItemId); } } } }

At this point, there are three scenarios as seen from the viewpoint of a Worker that just received a RestoringBackup message from another Worker:

  1. The Worker is restoring an item I don't have cached.
  2. The Worker is restoring an item I have cached but don't yet intend to restore.
  3. The Worker is restoring an item I am also intending to restore.

Scenario one is simple as no action is needed. Scenario two is also simple in that I can merely remove the backed-up item from our cache. I haven't yet determined that it needed restoring, but someone else has so they can have it.

It is scenario three that requires tie-breaking. Each peer is acting autonomously and could simultaneously announce that they intend to restore an item. The messages could pass each other on the wire and thus more than one Worker could find itself in scenario three.

The magic question is: Who wins? Who restores and who ignores? The decision must be made on something that isn't relative between peers (like a system's time) and doesn't change. I decided that a tie goes to the highest valued Worker ID. An impartial call to Guid.CompareTo determines the winner. With this in place, the winner does the restore and the loser ignores.

Peer Fun

I admit it . . . P2P networking is fun. But this doesn't negate the fact that there are real-world scenarios where its decentralized nature makes it a good fit for applications. Furthermore, these applications aren't limited to chat or media pirating! Peer-enabling applications that require bulk processing is a natural fit, whether a custom application farm or a High-Performance Computing (HPC) scenario. It makes work coordination and processing on a large scale relatively easy.

The design of WCF makes it trivial to implement a service and evolve it as needed. The shared data and communication of a P2P mesh allows different clients to observe and participate in what's going on. This article gives a demonstration of creating a peer-enabled processing platform where multiple players (Workers, Users, and Spectators) can all function together for a common purpose: getting your work done.

Matt Neely is a Software Design Engineer at Microsoft on the SQL Server User Education Tools team. He spends the majority of his time convincing his friend Caleb he'll be mentioned in an upcoming article and shaving his dog (a goldendoodle) by himself with astonishingly horrific results. He can be reached at mneely@microsoft.com.