TPL
针对任务并行执行的横向扩展能力
Microsoft .NET Framework 4 中引入的任务并行库 (TPL) 能够让应用程序开发者创建解决方案,以利用多核计算机中的并行处理能力。 但是在许多情况下,垂直扩展的能力(添加多个核心)受到多种因素的约束,包括成本和主机托管限制。 在这种情况下,如果需要扩展能力,则可以将数据处理在服务器阵列中分配;云托管就是这样的例子。 在本文中,我将描述一套概念性解决方案的主要方面(包括实施),通过使用 .NET Framework 4.5 的多项新功能完成上述情景。
基本前提条件
我将要描述的方法要求使用除 TPL 以外的多种技术,包括:
- 任务并行库 (TPL)
- Windows Communication Foundation (WCF)
- 托管可扩展性框架 (MEF)
请注意,我仅在我要试图解决的问题中讨论这些方面。 我假设你非常了解这些技术。
远程任务客户端、任务协调器及任务执行节点
远程任务客户端是客户端侧层,将隐藏由于使用分布式环境的语义而产生的复杂性。 远程任务客户端直接与任务协调器交互,然后协调器成为底层基础结构的入口点。 总体而言,任务协调器具有以下属性:
- 任务协调器是与客户端进行通讯的唯一联络点。
- 协调器公开必要的服务,以请求在可扩展的平台上执行任务,以及取消特定任务。
- 协调器处理任务执行请求的限制和队列,支持环境的健康操作。
任务执行节点是任务运行流程的主机。 由 TPL 执行的任务的实际实施驻留在任务执行节点。
以下是这些逻辑层和主要方面及信息流:
- 远程任务客户端请求执行一项或多项任务。
- 任务协调器向任务执行节点提交请求。
- 任务执行节点执行任务并更新任务协调器中每项请求的状态。
- 任务协调器使用每项请求的执行结果,更新客户端。
- 任务执行节点驻留在负载平衡器的后面,因此可以按需要添加更多节点,从而能够实现水平扩展。
图 1 显示了逻辑层和信息流。
图 1 水平扩展任务
注意任务执行节点如何更新任务协调器,然后任务协调器更新远程任务客户端。 我将要描述一项基于客户端与任务协调器之间双向通讯的实施,以及基于任务协调器与任务执行节点之间双向通讯的实施。 根据 WCF 定义,这表示使用双工通道,使任务执行节点回调任务协调器,然后任务协调器执行相同操作以更新客户端。 我将展示使用 WebSockets 实现这种双向通讯方法。 WebSockets 传输是作为 .NET Framework 4.5 的一个新绑定来实施的,可在 Windows 8 上使用。 有关此绑定的详情,请访问 bit.ly/SOLNiU。
客户端与任务协调器
既然已经理解了三大逻辑层——远程任务客户端、任务协调器及任务执行节点,现在让我们开始讨论远程任务客户端的实施。 注意当我在本文中使用“客户端”一词,我是指远程任务客户端。
如上所述,客户端的价值主张就是隐藏底层部件复杂性的能力。 一方面,客户端通过提供 API 来实现这一点,给人一种在本地执行任务的印象,尽管事实上可能在其他地方执行。 图 2 中的代码显示了 RemoteTaskClient 类的公共方法。
图 2 RemoteTaskClient 类的公共方法
public class RemoteTaskClient<TResult> : IDisposable
{
public void AddRequest(string typeName,
string[] parameters, CancellationToken tk)
{...}
public void AddRequest(string typeName, string[] parameters)
{...}
public Task<TResult>[] SubmitRequests()
{...}
public RemoteTaskClient(string taskCoodinatorEndpointAddress)
{...}
public void Dispose()
{...}
}
您可以使用 AddRequest 方法添加远程执行的请求。 对于每项请求,您需要指定 typeName(这是实际实施的类型,包含一个委托,基础结构将此委托当作 TPL 任务进行远程运行)及相关参数。 然后您可以通过 SubmitRequest 方法提交请求。 提交请求的结果是 TPL 任务阵列,每个请求一个任务。 这种方法允许您将所产生的 TPL 任务当作本地任务来管理。 例如,您可以提交多个请求,然后等待请求完成,例如:
using (var c = new RemoteTaskClient<int>("..."))
{
c.AddRequest("...", null);
c.AddRequest("...", null);
var ts = c.SubmitRequests();
Task.WaitAll(ts);
foreach (var t in ts)
Console.WriteLine(t.Result);
}
在详细讨论 RemoteTaskClient 的实施之前,让我们看看任务协调器公开的服务操作和数据约定。 在回顾 RemoteTaskClient 的实施之前理解这些约定将为您提供额外的上下文知识,因为客户端的实施依赖于这些服务。
图 3 中的代码显示了任务协调器向客户端公开的服务操作。 通过 SubmitRequest 操作,客户端能够请求执行一项或多项 TPL 任务。 客户端也可以通过 CancelTask 操作,请求取消某项尚未完成的 TPL 任务。 注:UpdateStatus 操作是一项回调。 通过在客户端侧实施这种回调约定,任务协调器将更新客户端的状态。
图 3 服务操作
[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
public interface ITaskCoordinator
{
[OperationContract(IsOneWay = true)]
void SubmitRequest(List<STask> stask);
[OperationContract]
bool CancelTask(string Id);
}
public interface ITaskUpdateCallback
{
[OperationContract (IsOneWay = true)]
void UpdateStatus(string id, STaskStatus status, string result);
}
让我们看看代表了任务执行请求的数据约定。 这是客户端发送给任务协调器的数据实体,然后任务协调器将向任务执行节点提交请求,而请求就在任务执行节点执行。 图 4 中显示的 STask 类模拟了一项任务执行请求。 客户端使用 STaskTypeName 和 STaskParameters 属性,可以设置想要执行的任务类型及相关参数。 任务协调器将使用属性 ID 作为唯一标识符,逻辑层可以使用此 ID 将请求与系统中运行的实际 TPL 任务进行关联。
图 4 STask 类
[DataContract]
public class STask
{
[DataMember]
public string Id
{ get; set; }
[DataMember]
public string STaskTypeName
{ get; set; }
[DataMember]
public string[] STaskParameters
{ get; set; }
}
现在让我们回到 RemoteTaskClient,讨论我如何计划将本地 TPL 任务与任务执行节点中的执行结果进行关联。 TPL 有一个很方便的类,TaskCompletionSource<TResult>,我可以用它来创建一个 TPL 任务并控制其生命周期。 此机制让我标记特定任务何时完成、取消或出错。 其中的意义在于每项进入任务执行节点(通过任务协调器)的请求必须与一个 TaskCompletionSource 实例相关联。 为此,我实施了 ClientRequestInfo 类,如图 5 所示。
图 5 ClientRequestInfo 类
internal class ClientRequestInfo<TResult>
{
internal STask TaskExecutionRequest
{ get; set; }
internal TaskCompletionSource<TResult> CompletionSource
{ get; set; }
internal ClientRequestInfo(string typeName, string[] args)
{
TaskExecutionRequest = new STask()
{Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
STaskParameters = args };
CompletionSource = new TaskCompletionSource<TResult>();
}
}
图 6 显示了此类构造函数的实施。
图 6 ClientRequestInfo 构造函数
ITaskCoordinator _client;
ConcurrentDictionary<string, ClientRequestInfo<TResult>>
_requests = new ConcurrentDictionary<string,
ClientRequestInfo<TResult>>();
public RemoteTaskClient(string taskCoordinatorEndpointAddress)
{
var factory = new DuplexChannelFactory<ITaskCoordinator>
(new InstanceContext(new CallbackHandler<TResult>(_requests)),
new NetHttpBinding(),
new EndpointAddress(taskCoordinatorEndpointAddress));
_client = factory.CreateChannel();
((IClientChannel)_client).Open();
}
注意:我正在打开通向任务协调器的双工通道,并创建 CallbackHandler 类型的回调实例。 CallbackHandler 收到 parameter _requests,里面包含 ClientRequestInfo 实例。 基本原理是 _requests 字典装有客户端请求的所有活动实例(以及与其相关的 TaskCompletionSource 实例),而 CallbackHandler 将处理来自任务协调器的更新。 因为多个服务请求将更新 _requests 字典,所以我需要保证线程安全性,因此需要创建此实例作为 ConcurrentDictionary 的实例。
图 7 显示了 CallbackHandler 类的实施。
图 7 CallbackHandler 类
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
public void UpdateStatus(string id, STaskStatus status, Object result)
{
ClientRequestInfo<TResult> info;
if (_requests.TryRemove(id, out info))
{
switch (status)
{
case STaskStatus.
Completed: info.CompletionSource.SetResult(
(TResult)result);
break;
case STaskStatus.Canceled:
info.CompletionSource.SetCanceled();
break;
case STaskStatus.Faulted:
info.CompletionSource.SetException(
(Exception)result);
break;
}
}
}
internal CallbackHandler(ConcurrentDictionary<string,
ClientRequestInfo<TResult>> requests)
{
requests = requests;
}
}
接下来,让我们看看 AddRequest 和 SubmitRequest 方法的实施,如图 8 所示。
图 8 AddRequest 和 SubmitRequest 方法
public void AddRequest(string typeName, string[] parameters,
CancellationToken tk)
{
var info = new ClientRequestInfo<TResult>(typeName, args);
_buffer.Add(info);
tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
{
_buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
}
public Task<TResult>[] SubmitRequests()
{
if (_buffer.Count == 0)
return null;
var req = _buffer.Select((r) =>
{
_requests.TryAdd(r.TaskExecutionRequest.Id, r);
return r.TaskExecutionRequest;
});
_client.SubmitRequest(req.ToList<STask>());
var ret = _buffer.Select(r =>
r.CompletionSource.Task).ToArray<Task<TResult>>();
_buffer.Clear();
return ret;
}
跟踪客户端请求
如上节所示,客户端仅与任务协调器进行交互,因此任务协调器的责任是处理客户端的请求,然后用 TPL 任务的执行结果更新客户端。 对于客户端,这需要以某种形式保留原来的请求。 还需要记录相应的回调实例(允许与客户端进行通讯);通向与连接相关的任务执行节点的通道(正如下文所示,在取消情景中需要);唯一标识符,将所有与任务执行节点单一调用相关的任务执行请求进行分组(以确定何时不再需要此通道);以及执行的状态和结果。 图 9 显示了 STaskInfo 类的定义,此实体将保存此信息。 此外,我还使用一个 ConcurrentDictionary<TKey,TValue> 实例,作为持久性机制。
图 9 STaskInfo 和 CoordinatorContext 类
public class STaskInfo
{
public string ExecutionRequestId
{ get; set; }
public STask ClientRequest
{ get; set; }
public ITaskUpdateCallback CallbackChannel
{ get; private set; }
public ITaskExecutionNode ExecutionRequestChannel
{ get; set; }
public STaskInfo(ITaskUpdateCallback callback)
{
CallbackChannel = callback;
}
}
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo>
_submissionTracker =
new ConcurrentDictionary<string, STaskInfo>();
...
}
最后,注意 _submissionTracker 包含在 CoordinatorContext 类中。 我将使用这个类来实施任务协调器的主要功能。
处理客户端请求
任务协调器是客户端的唯一入口点,即任务协调器必须能够处理尽可能多的客户端请求,同时防止任务执行节点饱和(在资源方面)。 这并非像看起来那么容易。 为了更好地解释潜在的变化,让我们看看一个简单的解决方案:
- 任务协调器公开服务操作,客户端通过服务操作提交任务执行请求。
- 任务协调器向任务执行节点提交这些请求以进行执行,并记录这些请求,即持续保持这种状态。
图 10 显示了此提交过程的基本实施。
图 10 实施提交过程
public class TaskCoordinatorService : ITaskCoordinator
{
...
public void SubmitRequest(List<STask> stasks)
{
CoordinatorContext.SendTasksToTaskHandler(stasks);
}
...
}
public static class CoordinatorContext
{
...
internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
{
var clientFactory = //Client factory creation logic..
var channel = clientFactory.CreateChannel();
foreach (var stask in stasks)
_submissionTracker.TryAdd(stask.Id, stask);
try
{
((IClientChannel)channel).Open();
channel.Start(stasks);
}
catch (CommunicationException ex)
{
// Error handling and logging ...
}
finally
{
if (((IClientChannel)channel).State != CommunicationState.Faulted)
((IClientChannel)channel).Close();
}
}
...
}
然而,这个简单的实施在某些情景下并不顺利:
- 如果客户端在一个请求中提交大量任务,所有任务将最终进入一个任务执行节点,导致不能均匀利用可用资源(假设有多个任务执行节点可用)。
- 在峰值负载情景下,如果执行 TPL 任务的数量超过了这些资源可以处理的限值,那么系统可能会耗尽任务执行节点中的可用资源。 当作为 TPL 任务执行的任务绑定到特定资源(例如内存),那么在峰值情况下会增加系统无响应的风险,便会出现这种情况。
限制
解决此类难题的一种方法是“管理”经过系统的任务执行请求。 在这种情况下,您可以将任务协调器看作是限制控制器。 但是在讨论限制流程之前,让我们回顾一下限制的语义,我将使用限制语义连同限制流程一起降低这些风险。
可以通过对任务协调器在一个请求中向任务执行节点提交的任务执行请求数量设置上限,降低第一个情景的风险。 我将此限制称为 maxSTasksPerRequest。 使用此方法,负载平衡器算法将能够在可用的任务执行节点之间进行负载的平衡分布。
第二个情景更具挑战。 一个貌似合理的解决方案是对任务执行节点执行的任务数设定上限。 我将此限制称为 maxNumberOfTasks。
除了此限制之外,此解决方案还可以设置另一个限制,以根据类型限制所执行任务的数量。 为了解释这种方法有用的原因,让我们考虑一种情景,即任务执行节点部署了两种类型的任务 T1 和 T2。 T1 是绑定 CPU,T2 是绑定磁盘 I/O。 在此情景中,客户端提交执行 T1 任务请求的吞吐量更有可能受到活动任务(受到相同类型约束的局限)的影响,因此 T1 任务数量越多,影响越大。 因为 T2 任务受不同约束的局限,因为对 T1 任务的影响并不相同。 能够按类型限制任务执行表明我可以控制在任何特定时间可以运行多少个 T1 任务,以便最大程度利用 CPU 资源,以及总体吞吐量。 我将此限制称为 maxNumberOfTasksByType。
队列和限制
理解了限制的语义以及限制如何能够有效地保持任务执行节点的健康操作之后,让我们看看当达到限制指定的限值时会发生什么情况,即实际的限制流程。
一种办法是引发例外情况。 然而,这会影响解决方案的总体吞吐量,因为客户端会产生检查特定错误或故障然后重新提交请求的开销,直至任务协调器成功处理请求为止。 一种替代方法是使用服务器侧队列,暂时不向客户端发出请求,一个类似监视器的进程(提交器进程)定期读取队列的请求,然后提交给任务执行节点。 我将使用提交器流程执行实际的限制,因为提交器考虑以下的规则来读取请求队列:
- 对可以取消 maxSTasksPerRequest 队列的请求数量设置上限。
- 如果达到 maxNumberOfTasks 限制,则停止取消队列请求,请求队列将保持现状。
- 如果达到 maxNumberOfTasksByType 限制,则取消队列,然后重新将请求加入请求队列中。 将请求重新加入队列可以继续处理其他类型的任务。 这种战略为队列中所有任务的执行提供了平等的机会。 但是在某些情况下,您可以考虑使用优先级队列。 您可以在 bit.ly/NF0xQq找到很好的参考材料。
图 11 阐明了这一过程。
图 11 提交过程
我将描述此过程的实施,首先显示 SubmitRequest 服务操作的代码(请看 图 12),此服务操作收到客户端的请求后,将请求重新加入请求队列。
图 12 SubmitRequest 服务操作
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
{
public void SubmitRequest(List<STask> stasks)
{
CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
}
...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
{
var callback =
OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
foreach (var stask in stasks)
_requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
}
...
}
接下来,让我们看看提交器流程的实施,如图 13 所示。
图 13 提交器实施
public static class CoordinatorContext
{
...
static CoordinatorContext()
{
Submitter(...);
}
private static async void Submitter(int interval)
{
while (true)
{
await Task.Delay(interval);
SendTaskRequestToTaskExecutionNode(
GetTasksFromRequestQ());
}
}
...
}
在 图 12 和 图 13中,您可以看到服务操作将请求重新加入(写入)队列,然后把提交器任务从请求队列中取消队列(读取)。 在此情景中,您需要确保底层数据结构——队列——是线程安全的。 幸运的是,有一个专为此目的而建的类 ConcurrentQueue<T>。 因此我将使用此类型的一个实例作为请求的底层存储库。
public static class CoordinatorContext
{
...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
new ConcurrentQueue<STaskInfo>();
...
}
现在,让我们回顾一下 GetTasksFromRequestQ 方法的实施,此方法在执行间隔期间读取任务。 正是通过这种方法实施限制流程,并应用我之前描述的限制。 图 14 显示了这种流程的实施。
图 14 GetTasksFromRequestQ 实施
public static class CoordinatorContext
{
...internal static List<STaskInfo> GetTasksFromRequestQ()
{
var ret = new List<STaskInfo>();
var maxSTasksPerRequest = //From a configuration
var maxNumberOfTasks = //From a configuration
var count = // Count of submitted or executing tasks
var countByType = // Enumerable of count by type
for (int i = 0; i < maxSTasksPerRequest; i++)
{
STaskInfo info;
if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
return ret;
var countTT = // Count of submitted or executing tasks of
// the type of the current item
if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
{ _requestQ.Enqueue(info); }
else ret.Add(info);
}
return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
// Logic to read from a configuration repository the value by task type name
}
...
}
图 14 中的实施目标是获得允许此流程评估限制条件的数量。 图 15 显示了可以按照 _submissionTracker 执行的貌似合理的 LINQ 查询,以及一个列表(包含返回项目 (ret) 以获得这些值)。 注意这种方法可以成功,但会降低性能。 如是这样,您也可以实施一套线程安全的计数器,计数器递增或递减提交跟踪器实例中增加或删除的项目,并使用这些计数器,而不是直接查询并发字典。
图 15 限制值
var countByType = (from t in _submissionTracker.Values
group t by t.ClientRequest.STaskTypeName into g
select new
{
TypeName = g.Key,
Count = g.Count()
});
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
where tt.TypeName == info.ClientRequest.STaskTypeName
select tt.Count).SingleOrDefault()+
ret.Where((rt) => rt.ClientRequest.STaskTypeName ==
info.ClientRequest.STaskTypeName)
.Count();
向任务执行节点发送请求并处理结果
到目前为止,我已经讨论任务协调器如何管理请求。 让我们看看任务协调器如何向任务执行节点提交请求,现在考虑一下限制流程。 为了提供更好的上下文,让我们首先回顾一下任务执行节点公开的服务操作(通过负载平衡器):
[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
public interface ITaskExecutionNode
{
[OperationContract]
void Start(List<STask> stask);
[OperationContract]
void Cancel(string Id);
}
正如名称所示,这些操作的目的是开启一个任务执行请求列表,并请求取消特定任务。 服务约定利用相同的回调约定,以通过约定的实施,更新任务协调器。
图 16 显示了更新的 SendTaskToTaskExecutionNode 方法实施,在此方法中,任务协调器将 STaskInfo 实例保存在 _submissionTracker,并调用任务执行节点中的 Start 服务操作。
图 16 SendTaskToTaskExecutionNode 方法和支持方法
internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
{
if (staskInfos.Count() == 0)
return;
var channel = new DuplexChannelFactory<ITaskExecutionNode>(
new InstanceContext(new CallbackHandler()),
new NetHttpBinding(), new EndpointAddress(“http://.../”))
.CreateChannel();
try
{
var requestId = Guid.NewGuid().ToString();
var reqs = staskInfos.Select(s => AddRequestToTracker(requestId,s, channel))
.Where(s => s != null);
((IChannel)channel).Open();
channel.Start(reqs.ToList<STask>());
}
catch (CommunicationException ex)
{
foreach (var stask in staskInfos)
HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
}
}
private static STask AddRequestToTracker(string requestId,
STaskInfo info, ITaskExecutionNode channel)
{
info.ExecutionRequestId = requestId;
info.ExecutionRequestChannel = channel;
if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
return info.ClientRequest;
HandleClientUpdate(info.ClientRequest.Id, STaskStatus.Faulted,
new Exception(“Failed to add “));
return null;
}
注:SendTaskToTaskExecutionNode 方法创建一个回调实例,以处理任务执行节点中任务执行的结果:
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler : ITaskUpdateCallback
{
public void UpdateStatus(string id, STaskStatus status, string result)
{
CoordinatorContext.HandleClientUpdate (id, status, result);
}
}
CallbackHandler 通过调用 HandleClientUpdate 方法处理回调操作。 此方法从 submitterTracker 检索并删除相应的 STaskInfo 实例,并对客户端执行回调,以更新结果。 此外,如果这是组中的最后一个请求,则关闭任务协调器与任务执行节点之间的通道。 图 17 显示了 HandleClientUpdate 方法的实施。
图 17 HandleClientUpdate 方法和支持方法
internal async static void HandleClientUpdate(
string staskId, STaskStatus status, object result)
{
STaskInfo info;
if (!_submissionTracker.TryGetValue(staskId, out info))
throw new Exception(“Could not get task from the tracker”);
try
{
await Task.Run(() =>
info.CallbackChannel.UpdateStatus(info.ClientRequest.Id, status, result));
RemoveComplete(info.ClientRequest.Id);
}
catch(AggregateException ex)
{
// ...
}
}
private static void RemoveComplete(string staskId)
{
STaskInfo info;
if (!_submissionTracker.TryRemove(staskId, out info))
throw new Exception(“Failed to be removed from the tracking collection”);
if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
info.ExecutionRequestId).Count() == 0)
CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
}
private static void CloseTaskRequestChannel(IChannel channel)
{
if (channel != null && channel.State != CommunicationState.Faulted)
channel.Close();
}
任务实施器
在客户端代码中,typeName 是在添加请求时必需的一个参数。 这个值最终传送到任务执行节点。 typeName 的值是一个接口的实施的类型名称,此接口公开一个函数委托,此函数委托将一个当作并行任务运行的功能进行封装,并驻留在所有任务执行节点。 我将这个接口称为 IRunnableTask。 此接口的实施器应当从客户端接收一个取消令牌作为参数以及一系列参数。 此委托还会返回任务的结果。 这就是该接口:
public interface IRunnableTask
{
Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}
在任务执行节点中启动任务
总体而言,任务执行节点负责将任务执行请求转变成 TPL 可以执行的实际任务,即启动 TPL 任务。 图 18 显示了这种流程的实施,我将在下文讨论。
图 18 启动任务
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{
public void Start(List<STask> stasks)
{
var callback =
OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
foreach (var t in stasks)
TaskExecutionContext.Start(t,callback);
}
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
try
{
// Step 1.a
var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
// Step 1.b
var rtask = from t in rtasks
where t.Value.GetType().FullName == stask.STaskTypeName
select t.Value;
// Step 2
var cs = new CancellationTokenSource();
var ct = cs.Token;
TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
// Step 3
Task<Object>
.Run(rtask.First().Run(ct, stask.STaskParameters), ct)
.ContinueWith(tes => UpdateStatus(tes, stask, callback));
}
catch (Exception ex)
{
...
}
}
...
}
第 1 步(a 和 b): 在此阶段,任务执行节点需要创建一个 IRunnableTask 实例,此实例将返回一个委托,当作客户端请求的任务类型来运行。 为此,我利用 MEF 和 .NET Framework 4.5 中的一个新功能,以实现无属性的配置方法。 图 19中的代码创建一个容器实例,导出“extensions”目录中 IRunnableTask 的所有实施。欲知 MEF 及无属性配置方法的详情,请参阅 2012 年 6 月 MSDN 杂志文章《无属性 MEF 配置方法》 msdn.microsoft.com/magazine/jj133818。
图 19 创建容器
internal static class CompositionUtil
{
private readonly static Lazy<CompositionContainer> _container =
new Lazy<CompositionContainer>(() =>
{
var builder = new RegistrationBuilder();
builder.ForTypesDerivedFrom<IRunnableTask>()
.Export<IRunnableTask>()
.SetCreationPolicy(CreationPolicy.NonShared);
var cat = new DirectoryCatalog(“extensions”, builder);
return new CompositionContainer(cat, true, null);
}
,true);
internal static CompositionContainer ContainerInstance
{
get { return _container.Value; }
}
}
接下来,让我们回到图 18 中的代码。 此代码使用容器获得 IRunnableTask 类型的导出,然后选择类型名称匹配客户端请求的实例。 注:我作出重要假设,即只有一个任务实例对应客户端请求的类型。 因此,我使用 LINQ 查询返回的第一个实例。
步骤 2: 实际创建 TPL 任务之前,此代码创建取消令牌源和取消令牌。 我将会在 ConcurrentDictionary<TKey,TValue> 一个实例中记录取消源。 当客户端请求取消时,任务执行节点将使用这个取消源列表。 以下为此实例的定义:
public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string,
CancellationTokenSource> _cancellationSources =
new ConcurrentDictionary<string, CancellationTokenSource>();
...
}
步骤 3: 这时,我用所创建的取消令牌,运行任务。 此任务之后是一个持续任务。持续任务是必要的,因为一旦完成 TPL 任务(无论成功或出错),必须通过服务调用,用执行结果更新任务协调器。 如 图 20 所示,我将任务协调器的更新流程封装到委托中,此委托将 TPL 任务作为参数,任务执行请求及回调实例接收到任务协调器。
图 20 封装更新流程
private static Action<Task<Object>, STask,
ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
{
try
{
STaskStatus s;
Object r = null;
switch (t.Status)
{
case TaskStatus.Canceled: s = STaskStatus.Canceled;
break;
case TaskStatus.Faulted:
s = STaskStatus.Faulted;
r = t.Exception.Flatten();
break;
case TaskStatus.RanToCompletion:
s = STaskStatus.Completed;
r = t.Result;
break;
default:
s = STaskStatus.Faulted;
r = new Exception("Invalid Status");
break;
}
CancellationTokenSource cs;
TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
cb.UpdateStatus(st.Id, s, r);
}
catch (Exception ex)
{
// Error handling
}
};
请求和处理取消
TPL 提供了实施任务取消的机制。 为此,此委托封装作为 TPL 任务运行的实际流程,需要响应取消请求并终止执行。 欲知任务取消的详情,请参阅 MSDN 库文章《任务取消》bit.ly/NYVTO0。
IRunnableTask 接口中的其中一个参数是取消令牌。 任务执行节点将为每个任务创建一个令牌,由接口的实施器来决定何时检查取消请求并平稳终止流程。 图 21中的代码显示了一个简单任务,此任务计算一个范围内偶数的数量,同时检查是否请求了取消。
图 21 检查取消
public class MySimpleCTask : IRunnableTask
{
public Func<Object> Run(Nullable<CancellationToken> ct,
params string[] taskArgs)
{
var j = int.Parse(taskArgs[0]);
var z = 0;
return (() =>
{
for (int i = 0; i < j; i++)
{
if (i % 2 != 0)
{
z++;
ct.Value.ThrowIfCancellationRequested();
}
}
return z;
});
}
}
正如我讨论客户端时所示,您可以添加一个带有取消令牌的请求,然后客户端在内部执行必要的订阅。 所以当提出取消时,取消请求被发送到任务协调器。 收到取消请求后,任务协调器检查此请求是否提交到任务执行节点,然后发送取消请求。 然后任务执行节点寻找对应客户端 ID 所请求的任务的取消源。 向任务执行节点提交取消请求相对简单,您只需找到响应请求的通道,因为任务协调器最初通过此通道提交任务执行请求。 这些通道需要对回调保持开启,回调会更新执行请求的状态。
图 22 显示了在任务协调器中实施服务操作。
图 22 在任务协调器中实施服务操作
public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
{
return CoordinatorContext.CancelTask(Id);
}
...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
Id, out info) && info.ExecutionRequestChannel != null)
{
info.ExecutionRequestChannel.Cancel(Id);
return true;
}
return false;
}
...
}
最后,图 23 显示了在任务执行节点中实施服务操作。
图 23 在任务执行节点中实施服务操作
class CancellationHandler : ICancellationHandler
{
public void Cancel(STask stask)
{
TaskExecutionContext.CanceTask(stask.Id);
}
}
public static class TaskExecutionContext
{
...
internal static void CancelTask(string Id)
{
CancellationTokenSource tknSrc;
if (_cancellationSources.TryGetValue(Id, out tknSrc))
tknSrc.Cancel(); }
...
}
任务协调器的扩展能力及其他考虑因素
值得注意的是,此实施假设任务协调器在单一节点上运行,但是可以扩展任务协调器(这至少需要进行以下的变更):
- 需要引入一个负载平衡器,以评估任务协调器。
- 如上所述,限制方法的关键是准确统计正在运行的任务数量(总数和字节数)。 在一个以上节点作为任务协调器运行的情景中,这些计数器将需要集中维护(例如在数据库中),同时仍然能够以同步方式更新或读取(避免争用条件、死锁等等)。
最后,注意如同任何开发方法,需要从风险和数值方面权衡其他可能满足您需求并且现成可用的备选方法。 例如,您可能想考虑 Microsoft HPC 服务器之类的技术作为貌似合理的解决方案,以应付多种情景,您可以根据本文描述的方法进行解决。
优化资源
TPL 提供了必要的基础结构,以便最大程度优化多核计算机的 CPU 资源使用率,也可用于实施在多个计算机边界之间进行扩展的方法。 这可以有助于工作负载自动化和批处理情景,不仅在一台多核服务器中需要并行性,在多台服务器中也需要并行性。
为了实现这种水平扩展能力,需要考虑多种构架因素。 主要因素是: 需要在现有资源中平衡负载,同时能够在现有服务器场添加更多资源,以及根据需要执行的任务的语义来限制资源。 微软开发工具的技术提供必要的构建基块,以实施考虑了上述主要因素的构架。
Jesus Aguilar 是 Microsoft 的高级应用程序开发经理,专门为开发人员提供高级支持。
衷心感谢以下技术专家对本文的审阅: Ryan Berry、Steve Case、Rick Claude 及 Piyush Joshi