2016 年 6 月

第 31 卷,第 6 期

此文章由机器翻译。

Reactive 框架 - 使用 Reactive 批量编写异步客户端服务器链接

通过 Peter Vogel | 2016 年 6 月

异步处理已变得更加常见应用程序开发中,如 Microsoft.NET Framework 已获取各种支持特定的异步设计模式的工具。通常创建设计良好的异步应用程序取决于认识到您的应用程序是实现,然后选取正确的.NET 组件集的设计模式。

在某些情况下,匹配需要进行集成多个.NET 组件。Stephen Cleary 的文章"针对异步 MVVM 应用程序的模式 ︰ 命令"(bit.ly/233Kocr),演示如何以完全支持模型-视图-视图模型 (MVVM) 模式以异步方式。在其他情况下支持需要在.NET Framework 中的只有一个组件。我已经讨论了实现提供者/使用者模式使用 BlockingCollection 中的我 VisualStudioMagazine.com 实用.NET 列,"创建简单、 可靠使用 BlockingCollection 异步应用程序"(bit.ly/1TuOpE6),并且,"创建复杂的异步应用程序使用 BlockingCollection"(bit.ly/1SpYyD4)。

另一个示例实现观察者设计模式以异步方式监视一个长时间运行的操作。在此方案中,返回一个 Task 对象的异步方法不起作用,因为客户端经常返回结果流。对于这些情况下,你可以利用.NET Framework 中的至少两个工具 ︰ ObservableCollection 和 Reactive Extensions (Rx)。对于简单的解决方案,ObservableCollection (以及 async 和 await 关键字) 就足够了。但是,对于多个"有趣",尤其是事件驱动的问题,Rx 您提供更好地控制进程。

定义图案

尽管观察者模式通常用在 UI 设计模式,包括模型-视图-控制器 (MVC)、 Model View Presenter (MVP) 和 MVVM — Ui 应视为只是一个场景下从较大的一组方案其中观察者模式适用。观察者模式 (从 Wikipedia 引用) 定义为 ︰ "对象,调用中的主题,[] 中保留其依赖项的列表,调用观察者,,并通知自动的所有状态的更改,它们通常通过调用其方法之一。"

实际上,观察者模式是关于从长时间运行的进程向客户端获取结果,只要这些结果不可用。如果没有某些版本的观察者模式,客户端必须等待,直到最后一个得到结果后,然后具有所有结果都发送到它们在单一的许多项中。在越来越多地异步情况下,所需结果变得可用时处理客户端与并行结果观察程序。为了强调在说什么多个 Ui 时利用观察者模式中,我将使用"客户端"和"服务器"而不是"观察者"和"主题,"这篇文章的其余部分中。

问题和机会

有至少三个问题和观察者模式的两个商机。第一个问题是经过侦听器的问题 ︰ 观察者模式的许多实现中需要服务器来保存对其客户端的所有引用。结果是,客户端可以将保留在内存中由服务器之前在服务器退出。这显然不在客户端连接和断开频繁的其中一个动态系统中的长时间运行的进程的理想解决方案。

结束侦听器问题,但是,将只是第二个,则较大问题的症状 ︰ 观察者模式的许多实现中需要在服务器和客户端紧密耦合,需要在服务器和客户端必须在所有时间存在。最起码,客户端应该能够确定服务器是否存在并选择不将附加;此外,服务器应该能够正常工作,即使没有任何客户端接受的结果。

第三个问题是与性能相关 ︰ 需要多长时间将其通知所有客户端的服务器? 在观察程序模式的性能直接受客户端接收通知的数量。因此,没有机会通过允许客户端会抢先筛选收到来自服务器的结果来提高观察者模式中的性能。这还解决了其中服务器会产生更多的结果 (或更广泛的结果) 的方案不是客户端很感兴趣 ︰ 客户端可以指示它是只是为了在特定情况下收到通知。第二个性能机会存在认识到,当服务器无结果或已完成生成的结果。客户端可以跳过处理服务器事件,直到客户端可以保证要处理的东西,并就他们知道它们已处理的最后一个结果,客户端可以释放这些资源所需的获取资源。

从观察器发布/订阅

这些注意事项分解会导致从观察者模式的简单实现相关发布/订阅模型。发布/订阅功能允许服务器以松散耦合方式实现观察者模式和客户端执行,即使另一个是当前不可用。发布/订阅通常还会实现客户端过滤,通过允许客户端订阅到特定主题/通道时通知我"有关采购订单") 或带有不同类型的内容时通知我"有关任何紧急请求") 相关联的属性。

一个问题仍然存在,但是。观察者模式的所有实现都倾向于紧密几个客户端和服务器特定的消息格式。因为所有的客户端必须更新才能使用新的格式很难更改大多数发布/订阅实现中的消息的格式。

在许多方面,这是类似于数据库中的服务器端游标的说明。若要传输成本降至最低,数据库服务器不会返回结果,如检索到每个行。但是,对于大的行集,数据库也不会返回所有行末尾的单个批处理中。相反,数据库服务器通常从保存在服务器上并且经常为这些子集游标返回子集变为可用。使用的数据库,客户端和服务器不必是同时存在 ︰ 数据库服务器可以运行时存在,则没有客户端客户端可以检查要查看服务器是否可以访问并,如果没有,请决定什么 (如果有的话) 否则它可以执行。筛选进程 (SQL) 也是非常灵活。但是,如果数据库引擎发生更改它使用返回的行的格式,然后所有客户端必须最起码,重新编译。

处理对象的缓存中

作为我的案例研究以查看简单的观察器模式实现,我使用我的服务器作为搜索发票的内存中缓存的类。该服务器可能,在其处理结束时,返回所有发票的集合。但是,我更喜欢让客户端分别并以并行方式对服务器的搜索过程处理发票。这意味着我更倾向于版本的过程中,它将返回每个发票,因为它找到,则允许客户端处理与搜索下一步发票并行每张发票。

服务器的简单实现可能如下所示 ︰

private List<Invoice> foundInvoices = new List<Invoice>();
public List<Invoice> FindInvoices(decimal Amount)
{
  foundInvoices.Clear();
  Invoice inv;
    // ...search logic to add invoices to the collection
     foundInvoices.Add(inv);
    // ...repeat until all invoices found
    return foundInvoices;
}

Yield return 以返回每个发票,如找到,则为,而不是组合列表,可能会使用更完善的解决方案。无论如何,调用 FindInvoices 方法的客户端将需要执行一些关键活动之前和之后处理。例如,一旦找到第一个项,则客户端可能需要使 MatchingInvoices 列表可以存放在客户端的发票或获取/初始化处理发票所需的任何资源。添加其他发票时,客户端将需要处理的每个发票和,如果服务器发出的信号,检索最终的发票时,释放不再需要因为没有要处理的"没有更多"发票的所有资源。

期间数据库检索,例如,读取将阻止,直到返回的第一行。一旦返回的第一行中,客户端初始化处理行所需的任何资源。最后一行检索时,允许客户端释放这些资源,因为没有要处理的多个行,读取也返回 false。

使用 ObservableCollection 创建简单的解决方案

用于在.NET Framework 中实现观察者模式最明显的选择是 ObservableCollection。ObservableCollection 会通知客户端 (通过事件) 时对其进行更改。

重写我的示例服务器以使用 ObservableCollection 类需要只有两个更改。首先,保存的结果的集合需要定义为一个 ObservableCollection,使其成为公共。其次,它已不再需要的方法以返回结果 ︰ 服务器只需将发票添加到集合。

服务器的新实现可能如下所示 ︰

public List<Invoice> FindInvoices(decimal Amount)
{
  public ObservableCollection<Invoice> foundInvoices =
    new ObservableCollection<Invoice>();
  public void FindInvoices(decimal Amount)
  {
    foundInvoices.Clear();
    Invoice inv;
    // ...search logic to set inv
     foundInvoices.Add(inv);
    // ...repeat until all invoices are added to the collection   
  }

使用此版本的服务器的客户端只需为 InvoiceManagement foundInvoices 集合 CollectionChanged 事件绑定事件处理程序。下面的代码中我使用了实现 IDisposable 接口以支持断开与该事件的类 ︰

public class SearchInvoices: IDisposable
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public void SearchInvoices()
  {
    invMgmt.foundInvoices.CollectionChanged += InvoicesFound;
  }
  public void Dispose()
  {
    invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged;
  }

在客户端,CollectionChanged 事件 NotifyCollectionChangedEventArgs 对象作为其第二个参数传递。同时,指定对象的 Action 属性有何变化对集合执行 (执行操作 ︰ 集合已被清除,新项目已添加到集合、 移动/替换/删除了现有的项) 以及发生更改的项 (任何添加的项的集合,集合之前要添加已移动/删除/替换项的位置的新项目中存在的项的集合) 有关的信息。

简单的客户端将以异步方式处理每个发票,因为它将添加到服务器中的集合中的代码将如下所示的代码中 图 1

图 1 以异步方式处理发票使用 ObservableCollection

private async void InvoicesFound(object sender,
  NotifyCollectionChangedEventArgs e)
{
  switch (e.Action)
  {
    case NotifyCollectionChangedAction.Reset:
      {
        // ...initial item processing
        return;
      }
    case NotifyCollectionChangedAction.Add:
      {
        foreach (Invoice inv in e.NewItems)
        {
          await HandleInvoiceAsync(inv);
        }
        return;
      }
  }
}

While 简单,此代码可能不适用于您的需求,尤其是如果您正在处理的长时间运行的进程或在动态环境中工作。例如,从异步设计的角度来看,该代码无法捕获 HandleInvoiceAsync,以便客户端可以管理异步任务返回的任务对象。您还需要确保即使 FindInvoices 在后台线程上运行,在 UI 线程上引发 CollectionChanged 事件。

由于其中的 Clear 方法之前所调用的服务器类中 (搜索第一张发票) 的 Action 属性重置值可以用作第一项是要从中检索的信号。但是,当然,没有发票可能会找到在搜索中,因此,使用重置操作可能会导致永远不会实际使用的客户端分配资源。若要实际处理"第一项"处理,您需要将标志添加到要执行仅当第一个找到该项目的添加操作处理。

此外,该服务器具有有限的数量的选项分别用于指示最后一张发票位于,以便客户端可以停止等待"下一步一。 服务器无法,据估计,清除的最后一项,在找到后的集合,而这只是会强制重置操作处理为处理更复杂 (具有我已处理发票? 如果是,然后我已经处理最后一张发票;如果不是,然后我一样对此过程第一张发票)。

虽然对于简单的问题,ObservableCollection 都会正常了,任何相当复杂的实现基于 ObservableCollection (和任何值效率的应用程序) 将需要一些复杂的代码,尤其是在客户端。

Rx 解决方案

如果您希望异步处理然后 Rx (可通过 NuGet 获得) 可以提供更好的解决方案,用于通过从发布/订阅模型借款方法人实现观察者模式。此解决方案还提供了基于 LINQ 的筛选模型,更好地发出信号的第一个/最后一个项条件和更好的错误处理。

Rx 还可以处理而言,可能有一个 ObservableCollection 更有趣的观察器实现。在我的案例研究,返回发票的初始列表后,我的服务器可能会继续在原始搜索完成 (并符合搜索条件,当然) 添加到缓存的新发票的检查。当发票会议的条件未显示,客户端将想要被通知有关的事件,以便可向列表添加新的发票。Rx 支持这些类型的基于事件的观察者模式优于 ObservableCollection 的扩展。

两个键中有接口 Rx 支持观察者模式。第一种是 IObservable < T > 实现的服务器并指定一个单一的方法 ︰ 订阅。服务器实现 Subscribe 方法将其传递对对象的引用从客户端。若要处理失效的侦听器问题,Subscribe 方法返回到客户端实现 IDisposable 接口的对象的引用。客户端可以使用该对象与服务器断开。当客户端未断开连接后时,服务器需要从任何其内部列表中删除客户端。

第二个是 < T > IObserver 接口,该客户端必须实现接口。该接口要求客户端实现并公开到服务器的三种方法 ︰ OnNext、 OnCompleted 和 OnError。此处的关键方法是 OnNext,服务器用来将一条消息传递到客户端 (在我的案例研究该消息为新发票对象将返回每个所显示的一样)。服务器可以使用客户端的 OnCompleted 方法以发出信号没有更多数据。第三种方法,OnError,提供了一种方法来通知客户端有异常发生的服务器。

请别客气 IObserver 接口自己实现,当然 (它的.NET Framework 的一部分)。以及 ObservableCollection,这可能是您只需如果你要创建一个同步解决方案 (我编写了之处在于,过,"编写更简洁的代码与 Reactive Extensions"列 [bit.ly/10nfQtm])。

但是,Rx 包括几个包来提供这些接口,包括 JavaScript 和 RESTful 服务实现的异步实现。Rx 主题类提供简化了实现观察者模式的异步发布/订阅版本的 IObservable 的实现。

创建异步解决方案

创建用于使用使用者对象的服务器需要对原始的同步服务器端代码很少的更改。我的旧 ObservableCollection 替换使用者传递的对象将每个发票所示向任何侦听客户端。我声明为公共的使用者对象,以便客户端可以访问它 ︰

public class InvoiceManagement
{
  public IObservable<Invoice> foundInvoice =
    new Subject<Invoice>();

在正文中的方法,而不是发票添加到集合中,我使用使用者的 OnNext 方法来将每个发票传递到客户端,如找到,则为 ︰

public void FindInvoices(decimal Amount)
{
  inv = GetInvoicesForAmount(Amount) // Poll for invoices
  foundInvoice.OnNext(inv);
  // ...repeat...
}

在我的客户端,我首先声明服务器类的实例。然后,在标记为异步方法中,我调用者的 Subscribe 方法,以指示我想要开始接收消息 ︰

public class InvoiceManagementTests
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public async void ProcessInvoices()
  {
    invMgmt.foundInvoice.Subscribe<Invoice>();

若要筛选结果与我希望的发票,我可以对使用者对象应用 LINQ 语句。本示例筛选为延期交货的发票 (若要使用 Rx LINQ 扩展,您将需要添加一条 using 语句 System.Reactive.Linq 命名空间):

invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();

一旦我一直在侦听使用者,我可以指定什么的处理我想我接收发票时执行的操作。例如,我可以使用 FirstAsync 处理只是该服务返回的第一张发票。在此示例中,我使用 await 语句与对 FirstAsync 的调用,以便我可以将控制返回到我的应用程序处理发票时的主要部分。此代码将一直等待要检索的第一张发票,然后移到任何代码,我使用初始化发票处理过程,最后,处理发票上 ︰

Invoice inv;
inv = await invMgmt.foundInvoice.FirstAsync();
// ...setup code invoices...
HandleInvoiceAsync(inv);

有一点需要特别注意: 如果该服务器尚未未生成任何结果,将阻止 FirstAsync。如果你想要避免阻塞,可以使用 FirstOrDefaultAsync,如果服务器未生成任何结果将返回 null。如果不有任何结果,客户端可以决定要的内容,如果任何内容,以执行操作。

更常见的情况是客户端需要用于处理 (之后,将返回筛选) 的所有发票和执行异步操作。在这种情况下,而不是使用订阅和 OnNext 的组合,您可以只使用 ForEachAsync 方法。您可以传递一个方法或 lambda 表达式,用于处理传入的结果。如果传递方法 (它不能是异步的),为我在这里,执行触发 ForEachAsync 的发票将传递方法 ︰

invMgmt.foundInvoice.ForEachAsync(HandleInvoice);

ForEachAsync 方法也可以传递要让它正在断开连接的客户端信号的取消标记。较好的做法是将标记传递时调用任何 Rx * 异步方法来支持允许客户端终止处理,而无需等待要处理的所有对象。

ForEachAsync 不会处理任何已处理的第一个 (或 FirstOrDefaultAsync) 方法,以便您可以使用 FirstOrDefaultAsync ForEachAsync 检查以确定服务器是否处理后续对象之前要处理的任何内容的结果。但是,使用者的 IsEmpty 方法将更加简单地执行相同的检查。如果客户端具有分配所需的处理结果的任何资源,则 IsEmpty 允许客户端检查以查看是否有任何分配这些资源 (一种替代方法是以分配这些资源在处理循环中的第一个项上) 之前的准备工作。检查以查看是否存在资源分配 (并开始处理) 之前的任何结果,同时还支持取消会使代码如下所示的客户端使用 IsEmpty 图 2

图 2 代码以支持取消和延迟处理直至结果都准备就绪可以

CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken cancel;
cancel = cancelSource.Token;
if (!await invMgmt.foundInvoice.IsEmpty())
{
  // ...setup code for processing invoices...
  try
  {
    invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel);
  }
  catch (Exception ex)
  {
    if (ex.GetType() != typeof(CancellationToken))
    {
      // ...report message
    }
   }
   // ...clean up code when all invoices are processed or client disconnects
}

总结

如果您所需 ObservableCollection 可能会执行所有需要处理结果流,然后是观察者模式的简单实现。更好地控制和基于事件的应用程序,使用者类和扩展附带 Rx 将让应用程序在异步模式下工作原理是支持发布/订阅模型的强大实现 (和我还没有看过运算符 Rx 附带的丰富的库)。如果您正在使用 Rx,就值得下载 Rx 设计指南 (bit.ly/1VOPxGS),其中讨论了使用并生成可观察流中的最佳做法。

Rx 还提供了一些支持,通过使用 ISubject < TSource、 TResult > 接口传递客户端和服务器之间的消息类型转换。ISubject < TSource、 TResult > 接口指定两种数据类型 ︰"in"的数据类型和"out"数据类型。在实现此接口的使用者类可以执行将从服务器 ("输入"数据类型) 返回到客户端 ("out"数据类型) 所需结果的结果转换所需的任何操作。此外,在参数是协变 (它将接受指定的数据类型或任何数据类型继承自) 和输出参数是逆变参数 (它将接受指定的数据类型或从其派生的任何内容),为您提供更大的灵活性。

我们生活在越来越多地异步的世界中,并在该情况下,观察者模式将变得越来越重要 — — 它是一个有用的工具的任何位置的服务器进程返回了多个单个结果的进程之间的接口。幸运的是,有几个选项用于在.NET Framework 中,包括 ObservableCollection 和 Rx 实现观察者模式。


Peter Vogel是一种系统架构师和 PH & V Information Services 中的主体。PH 和 V 提供从通过对象建模和数据库设计的用户体验设计咨询完全堆栈。

感谢以下的微软技术专家对本文的审阅: Stephen Cleary、 James McCaffrey 和 Dave Sexton
Stephen Cleary 曾经使用过多线程和异步编程 16 年的和具有异步支持在 Microsoft.NET Framework 以来使用第一个社区技术预览。他是"并发在 C# 》 (O'Reilly Media,2014年) 的作者。他的主页,包括他的博客位于 stephencleary.com