导出 (0) 打印
全部展开

在 Windows Azure 上最大限度地提高基于队列的消息传递解决方案的可伸缩性和成本效率的最佳实践

注:本页面内容可能不完全适用中国大陆地区运营的 Windows Azure服务。如要了解不同地区 Windows Azure 服务的差异, 请参考本网站.

作者:Valery Mizonov

审阅:Brad Calder、Sidney Higa、Christian Martinez、Steve Marx、Curt Peterson、Paolo Salvatori 和 Trace Young

本文针对在 Windows Azure 平台上构建可伸缩、高效和经济实用的基于队列的消息传递解决方案,提供了一些说明性指导和最佳做法。本文的目标读者包括解决方案架构师和开发人员,他们要设计和实现基于云的解决方案,这些解决方案需要利用 Windows Azure 平台 队列存储服务

摘要

传统的基于队列的消息传递解决方案利用的概念是一个称作消息队列的消息存储位置,它是与一个或多个参与者之间发送或接收(通常通过异步通信机制)的数据的存储库。

基于队列的数据交换是可靠且高度可伸缩的消息传递体系结构的基础,能够在分布式计算环境中支持广泛的功能强大的各种方案。无论是调度大量工作还是进行持久的消息传递,消息队列技术都可以参与其中并且提供一流的功能,从而满足各种规模下异步通信的不同要求。

本文旨在说明开发人员如何将特定设计模式与 Windows Azure 平台所提供的功能结合在一起使用,以便构建优化且经济实用的基于队列的消息传递解决方案。本文深入探讨了一些用于在 Windows Azure 解决方案中实现基于队列的交互的最常用方法,并且就改进性能、增加可伸缩性和降低运营成本提供了一些建议。

我们还在论述基本概念的同时根据需要辅之以一些相关的最佳做法、提示和建议。本文中描述的方案主要来自于真实客户项目的技术实现案例。

客户方案

为了让示例更为生动和具体,我们归纳总结了一个如下所示的真实客户方案。

一个 SaaS 解决方案提供商启动了一个新的计费系统,这个系统是作为 Windows Azure 应用程序实现的,用来满足不同规模的客户事务处理的业务需要。这个解决方案的主旨是能够将大量占用计算资源的工作负荷转移到云上,并且利用 Windows Azure 基础结构的灵活性来执行计算密集型工作。

该端到端体系结构的内部元素在全天中会定期将大量事务整合并调度到 Windows Azure 托管服务上。每次提交的事务量从几千到几十万个不等,每天的事务量可达数百万个。此外,假定该解决方案必须满足一个 SLA 要求,即必须保证事务处理延迟不超过一个最大限值。

该解决方案的体系结构是建立在分布式 map-reduce(映射-化简)设计模式基础上的,并且由基于多实例辅助角色的云层构成(使用 Windows Azure 队列存储来进行工作调度)。事务批次由 Process Initiator 辅助角色实例接收,分解(批次拆分)成若干更小的工作项,并且排入到 Windows Azure 队列集合中以便进行负荷分配。

工作负荷的处理由处理辅助角色的多个实例执行,并且从队列中提取工作项并经由计算过程传递它们。这些处理实例利用多线程队列侦听器来实现并行数据处理以便获得最佳性能。

已处理的工作项将被路由到一个专用队列中,在该队列中,Process Controller 辅助角色实例将会取消对这些工作项的排队,并且将这些工作项聚合并保存到一个数据存储中以便进行数据挖掘、报告和分析。

该解决方案的体系结构如下图所示:

Best-Practices-Messaging-Solutions-Azure1

上图描绘了用于向外扩展大型或复杂计算工作负荷的典型体系结构。对于需要通过队列彼此进行通信的许多其他 Windows Azure 应用程序和服务而言,此体系结构所采用的基于队列的消息交换模式是一种非常典型的模式。这使得可以采取规范的方法来检查在基于队列的消息交换中涉及的特定基础组件。

基于队列的消息传递基础知识

一个使用消息队列在分布式组件之间交换数据的典型消息传递解决方案应该包括:“发布服务器”(将消息存入队列中)和一个或多个“订阅服务器”(用于接收这些消息)。在大多数情况下,订阅服务器(有时候称作“队列侦听器” )是作为单线程进程或多线程进程实现的,连续运行或者根据计划模式按需启动。

在更高的层级,可以使用两个主要的调度方法使队列侦听器能够接收在队列上存储的消息:

  • 轮询(基于请求的模型):侦听器通过定期检查队列中是否有新消息来监视队列。在队列为空时,侦听器会继续轮询队列,并且通过进入睡眠状态来定期退让。

  • 触发(基于推送的模型):侦听器订阅当消息到达队列时触发的事件(该事件由发布服务器本身触发,或由队列服务管理器触发)。然后,侦听器可以开始消息处理,从而不必轮询队列来确定是否有任何新工作可用。

还需要提及的是,这两种机制都可以进一步分为不同的形式。例如,轮询可以是阻止和非阻止的。阻止将在新消息在队列上出现(或遇到超时)前保留请求,而非阻止请求在队列上没有任何项时会立即完成。就触发模型而言,可为每个新消息都将通知推送到队列侦听器,也可以仅在第一个消息到达空队列时或者队列深度达到某个级别时将通知推送到队列侦听器。

note注意
Windows Azure 队列服务 API 支持的取消排队操作是非阻止的。这意味着,如果在队列上找不到任何消息,则 GetMessageGetMessages 之类的 API 方法将会立即返回。与之相反,Windows Azure Service Bus 队列提供阻止接收操作,这些操作将阻止调用线程,直到某一消息到达队列或者经过了指定的超时期限。

下面总结了当今在 Windows Azure 解决方案中用来实现队列侦听器的最常见方法:

  1. 侦听器作为一个应用程序组件实现,该组件作为辅助角色实例的一部分实例化并执行。

  2. 该队列侦听器组件的生命周期通常受到宿主角色实例的运行时间的约束。

  3. 主要处理逻辑由一个循环构成,在这个循环中,将取消消息的排队并且安排进行处理。

  4. 如果未收到任何消息,则侦听线程将会进入睡眠状态,这个时间段通常受到特定于应用程序的退让算法驱动。

  5. 执行接收循环并且对队列进行轮询,直到通知侦听器退出循环并终止。

下面的流程图描绘了在 Windows Azure 应用程序中使用轮询机制实现队列侦听器时常用的逻辑:

Best-Practices-Messaging-Solutions-Azure2
note注意
在本文中,没有使用更复杂的设计模式,例如要求使用中央队列管理器(代理)的那些设计模式。

在使用 Windows Azure 队列时,将典型的队列侦听器与轮询机制一起使用可能不是最佳选择,因为 Windows Azure 定价模型根据对队列执行的应用程序请求衡量存储事务,而与队列是否为空无关。在接下来的几个部分中,我们将针对 Windows Azure 平台上的基于队列的消息传递解决方案,介绍一些提高性能和降低成本的技术。

用于性能、可伸缩性和成本优化的最佳做法

在本部分中,我们必须了解如何改进设计的相关方面以便实现更高的性能、更好的可伸缩性且更为经济实用。

也许,考量某一实现模式是否“更高效”的最简单方式就是确认设计是否满足以下目标:

  • 通过消除不会产生任何有价值工作的大量存储事务来降低运营开支

  • 消除由轮询间隔在检查是否有新消息时造成的额外的延迟

  • 通过调整处理能力以便适应不断变化的工作量来动态缩放

实现模式还应该在满足这些目标的同时不会产生其所带来的影响超过相关利益所带来的好处的复杂局面。

用于优化存储事务成本的最佳做法

当评估在 Windows Azure 平台上部署的解决方案的总拥有成本 (TCO) 和投资回报 (ROI) 时,存储事务量是 TCO 计算公式中的主要变量之一。减少针对 Windows Azure 队列的事务数量将降低运营成本,因为运营成本与正在 Windows Azure 上运行的解决方案相关。

在基于队列的消息传递解决方案环境中,可以一起采用以下几种方法来减少存储事务量:

  1. 在将多个消息放入某一队列中时,将相关消息组合到单个更大的批次中,对消息进行压缩并将压缩后的映像存储于一个 blob 存储中,并且使用该队列保持对承载实际数据的 blob 的引用。

  2. 在从某一队列检索消息时,将多个消息一起组合到单个存储事务中。队列服务 API 中的 GetMessages 方法支持在单个事务中取消指定数目的消息的排队(见下面的说明)。

  3. 当检查在某一队列上是否存在工作项时,如果队列保持连续为空,则避免频繁的轮询间隔并且实现增加轮询请求之间的时间的退让延迟

  4. 减少队列侦听器的数目 – 在使用基于请求的模型时,如果队列为空,则每个角色实例仅使用 1 个队列侦听器。若要进一步将每个角色实例的队列侦听器数目减少到零,请使用通知机制在队列接收到工作项时实例化队列侦听器。

  5. 如果在大部分时间中队列都保持为空,则自动减少角色实例的数目并且继续监视相关系统度量,以便确定应用程序是否以及何时应增加实例数目以便处理增加的工作负荷。

上述大多数建议都可以转化为一个十分通用的实现,这个实现处理消息批次并且封装许多基础队列/blob 存储和线程管理操作。在本文的后面,我们将说明如何操作。

Important重要提示
在通过 GetMessages 方法检索消息时,队列服务 API 在单个取消排队操作中支持的最大批次大小限制为 32。

一般来说,Windows Azure 队列事务的成本将随着队列服务客户端数目的增加(例如,在增加角色实例的数目时或在增加取消排队线程的数目时)而线性增加。为了阐释未利用上述建议的解决方案设计的潜在成本影响,我们将提供一个示例,以具体的数字来佐证。

效率低下的设计的成本影响

如果解决方案架构师未实现相关优化,则在解决方案在 Windows Azure 平台上部署并运行后,上述计费系统体系结构将很可能会产生额外的运营支出。在本部分中将介绍为什么会产生可能的额外支出。

如在应用场景定义中所述,业务事务数据定期到达。但是,假定在 8 小时标准工作日中,该解决方案忙于处理工作负荷的时间仅是这 8 小时中的 25%。这将导致在长达 6 小时的工作时间(8 小时 * 75%)内,系统处于没有任何事务处理的“空闲”状态。此外,该解决方案在每天的 16 小时非工作时间中将不会接收任何数据。

在总共 22 小时的空闲期间中,该解决方案仍会尝试取消排队工作,因为它并不明确地知道新数据何时到达。在这个时间窗口中,假定采用默认的 1 秒轮询间隔,则每个单独的取消排队线程都将对一个输入队列执行最多 79,200 个事务(22 小时 * 60 分钟 * 60 个事务/分钟)。

如前所述,Windows Azure 平台中的定价模型基于单独的“存储事务”。存储事务就是用户应用程序为添加、读取、更新或删除存储数据而提出的请求。在撰写本白皮书之际,存储事务按 10,000 个事务 $0.01 的费率计费(未考虑任何促销或特殊定价安排)。

Important重要提示
在计算队列事务的数目时,请记住将单个消息放置于一个队列中将会作为 1 个事务计数,而使用消息通常是一个由两个步骤构成的过程,它涉及在检索后发出请求从队列中删除消息。因此,成功的取消排队操作将导致两个存储事务。请注意,即使在取消排队请求导致未检索任何数据时,它仍将计为一个收费事务。

在上述情形下由单个取消排队线程生成的存储事务会在您的每月帐单上加上一笔大约 $2.38(79,200 / 10,000 * $0.01 * 30 天)的金额。比较之下,200 个取消排队线程(或者,200 个辅助角色实例中 1 个取消排队线程)会将每月成本增加 $457.20。这是在该解决方案完全未执行任何计算(只是对队列进行检查以便确定是否有任何工作项可用)时产生的成本。上面这个示例比较抽象,因为没有人会这样实现其服务,这就是执行下面所介绍的优化之所以重要的原因。

消除额外延迟的最佳做法

为了优化基于队列的 Windows Azure 消息传递解决方案的性能,方法之一就是使用本部分中所述的随 Windows Azure Service Bus 一起提供的发布/订阅消息传递层。

在这个方法中,开发人员将需要关注创建轮询和实时的基于推送的通知的组合,并且使侦听器能够订阅根据某些条件触发的通知事件(触发器),以便指示某一队列上有了新的工作负荷。这个方法通过用于调度通知的发布/订阅消息传递层增强了传统的队列轮询循环。

在一个复杂的分布式系统中,此方法将需要使用“消息总线”或“面向消息的中间件”,以便确保通知可以可靠地以松散耦合方式中继到一个或多个订阅服务器。为满足在 Windows Azure 上运行和内部运行的松散耦合的分布式应用程序服务之间的消息传递要求,Windows Azure Service Bus 是当然之选。它还非常适合于“消息总线”体系结构,此体系结构将在基于队列的通信中涉及的各进程之间实现交换通知。

在基于队列的消息交换中参与的进程可以采用以下模式:

Best-Practices-Messaging-Solutions-Azure3

具体而言,因为它与队列服务发布服务器和订阅服务器之间的交互相关,所以,适用于 Windows Azure 角色实例之间的通信的那些原则也同样满足针对基于推送的通知消息交换的大部分要求。我们已经在如何使用 Windows Azure Service Bus 简化和扩展角色间通信中介绍了这些基础知识。

Important重要提示
使用 Windows Azure Service Bus 受到定价模型的约束,该定价模型将考虑针对 Service Bus 消息传递实体(例如队列或主题)的消息传递操作量。

因此,执行成本收益分析以便评估将 Service Bus 引入某一给定体系结构的利弊十分重要。延续上面的思路,也有必要评估引入基于 Service Bus 的通知调度层实际上是否会导致成本降低,从而证明进行投资和执行额外的开发工作是值得的。

有关针对 Service Bus 的定价模型的详细信息,请参阅 Windows Azure 平台 FAQ 中的有关部分。

尽管对延迟的影响可以通过发布/订阅消息传递层十分轻松地予以解决,但通过使用下一部分中描述的动态(弹性)缩放可以进一步降低成本。

动态缩放的最佳做法

借助于 Windows Azure 平台,客户能够比以往更快、更轻松地进行缩放。能够适应不断变化的工作负荷和流量是云平台的主要价值主张之一。这意味着“可伸缩性”不再是成本高昂的 IT 代名词,它现在是一种现成的功能,可根据需要在良好设计的云解决方案中以编程方式实现。

“动态缩放”是一种技术能力,体现了某一给定解决方案通过在运行时增减工作容量和处理能力来适应波动的工作负荷的能力。Windows Azure 平台本身通过设置可按需购买计算小时的分布式计算基础结构来支持动态缩放。

区分 Windows Azure 平台上的以下两种动态缩放十分重要:

  • “角色实例缩放”表示添加和删除附加的 Web 或辅助角色实例以便处理该时间点的工作负荷。这常常包括更改服务配置中的实例计数。增加实例计数将会导致 Windows Azure 运行时启动新实例,而减少实例计数将会导致关闭正在运行的实例。

  • “进程(线程)缩放”表示根据当前工作负荷增减线程数目,保持某一给定角色实例中正在处理的线程数量足够。

对于基于队列的消息传递解决方案,动态缩放一般建议采用以下的组合:

  1. 监视关键绩效指标,包括 CPU 使用率、队列深度、响应时间和消息处理延迟。

  2. 动态增减角色实例的数目,以便处理工作负荷中的峰值(无论是可预测的还是不可预测的)。

  3. 以编程方式增减正在处理的线程数目,以便适应某一给定角色实例处理的变化的负荷情况。

  4. 对工作负荷进行精细划分并且执行并行处理(使用 .NET Framework 4 中的任务并行库)。

  5. 对于工作负荷变化非常大的解决方案,需要保持可变的容量,不仅能够应对突发的峰值情形,也不会产生设置额外实例的开销。

借助于服务管理 API,Windows Azure 托管服务能够通过在运行时更改部署配置来修改其正运行的角色实例的数目。

note注意
在一个典型订阅中,Windows Azure 小型计算实例的最大数目(或者就核心数目而言其他大小的计算实例的等效数目)默认被限制为 20。增加此配额的任何请求都应该向 Windows Azure 支持团队提出。有关详细信息,请参阅 Windows Azure 平台 FAQ

角色实例计数的动态缩放不见得始终是用于处理负荷峰值的最合适选择。例如,一个新的角色实例可能需要用几秒钟的时间来启动,并且关于启动时间目前没有相关的 SLA 度量。而一个解决方案可能只需要增加工作线程的数目即可处理增加的工作负荷。在处理工作负荷时,解决方案将会监视相关负荷度量,并且确定是否需要动态减少或增加工作进程的数目。

Important重要提示
目前,单个 Windows Azure 队列的可伸缩性目标“限制”在 500 个事务/秒。如果某个应用程序尝试超过此目标,例如,通过从运行数百个取消排队线程的多个角色实例来执行队列操作,则可能导致收到来自存储服务的 HTTP 503“服务器忙”响应。在这种情形发生时,应用程序应使用指数退让延迟算法实现某一重试机制。但是,如果此类 HTTP 503 错误经常发生,则建议使用多个队列并且实现基于分区的策略来跨多个队列进行缩放。

在大多数情况下,对工作进程进行自动缩放由单独的角色实例负责完成。与之相反,角色实例缩放常常涉及负责监视性能度量和执行相应缩放操作的解决方案体系结构的核心元素。下图描绘了一个称作“动态缩放代理”的服务组件,它对负荷度量进行收集和分析,以便确定是否需要设置新实例或停用空闲实例。

Best-Practices-Messaging-Solutions-Azure4

需要特别注意的是,该缩放代理服务可作为在 Windows Azure 上运行的辅助角色部署,也可作为内部服务部署。该服务将能够访问 Windows Azure 队列,而与部署拓扑无关。

为了实现动态缩放功能,请考虑使用 Microsoft Enterprise Library 自动缩放应用程序块,它能够在 Windows Azure 上运行的解决方案中实现自动缩放行为。该自动缩放应用程序块提供在 Windows Azure 应用程序中定义和监视自动缩放所需的所有功能。

现在,我们已介绍了延迟影响、存储事务成本和动态缩放要求,此时是将我们的这些建议整合到一个技术实现中的时候了。

技术实现

在前面的部分中,我们介绍了基于 Windows Azure 队列存储队列的良好设计的消息传递体系结构所要具备的关键特性。我们已介绍了有助于减少处理延迟、优化存储事务成本和改进对波动的工作负荷的响应性的三个主要关注面。

本部分旨在提供一个起始点,以便从编程角度协助 Windows Azure 开发人员实现在本白皮书中提及的一些模式。

note注意
本部分将着重介绍如何构建同时支持基于请求的模型和基于推送的模型的可自动缩放的队列侦听器。有关在角色实例级别用于动态缩放的高级技术,请参考 Enterprise Library 自动缩放应用程序块

此外,由于篇幅所限,我们只是重点介绍一些核心功能元素,并且在下面的代码示例中省略了许多支持结构代码,避免代码过于复杂。还要特别指出的是,下文中介绍的技术实现并非针对某一给定问题空间的唯一解决方法,只是为了向开发人员提供一个起始点,开发人员可以借此衍生他们自己的更适合的解决方法。

从此处开始,本白皮书将着重介绍实现上述模式所需的源代码。

构建一般的队列侦听器

首先,我们定义一个约定,它将由某一辅助角色承载并且侦听 Windows Azure 队列的队列侦听器组件实现。

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension
{
    /// Starts a multi-threaded queue listener that uses the specified number of dequeue threads.
    void StartListener(int threadCount);

    /// Returns the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo QueryState();

    /// Gets or sets the batch size when performing dequeue operation against a Windows Azure queue.
    int DequeueBatchSize { get; set; }

    /// Gets or sets the default interval that defines how long a queue listener will be idle for between polling a queue.
    TimeSpan DequeueInterval { get; set; }

    /// Defines a callback delegate which will be invoked whenever the queue is empty.
    event WorkCompletedDelegate QueueEmpty;
}

QueueEmpty 事件供宿主使用。它为宿主提供用于在队列为空时控制队列侦听器行为的机制。按如下所示定义相应的事件委托:

/// <summary>
/// Defines a callback delegate which will be invoked whenever an unit of work has been completed and the worker is
/// requesting further instructions as to next steps.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="idleCount">The value indicating how many times the worker has been idle.</param>
/// <param name="delay">Time interval during which the worker is instructed to sleep before performing next unit of work.</param>
/// <returns>A flag indicating that the worker should stop processing any further units of work and must terminate.</returns>
public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);

如果可以使用泛型操作侦听器,而不是使用“裸机”SDK 类(例如 CloudQueueMessage),则处理队列项将更为容易。因此,我们定义了一个新接口,该接口将由能够支持对队列进行泛型访问的队列侦听器实现:

/// <summary>
/// Defines a contract that must be supported by an extension that implements a generics-aware queue listener.
/// </summary>
/// <typeparam name="T">The type of queue item data that will be handled by the queue listener.</typeparam>
public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T>
{
}

请注意,我们还实现了识别泛型的侦听器,以便通过利用在 .NET Framework 4 中提供的 IObservable<T> 接口实现 Observer 设计模式,将队列项推送到一个或多个订阅服务器。

我们想要保留实现 ICloudQueueListenerExtension<T> 接口的组件的单个实例。但是,我们需要能够运行多个取消排队线程(工作进程;或出于简化目的,也可以是任务)。因此,我们在队列侦听器组件中添加了对多线程取消排队逻辑的支持。在这里,我们利用了任务并行库 (TPL)。StartListener 方法将负责启动指定数目的取消排队线程,如下所示:


/// <summary>
/// Starts the specified number of dequeue tasks.
/// </summary>
/// <param name="threadCount">The number of dequeue tasks.</param>
public void StartListener(int threadCount)
{
    Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");

    // The collection of dequeue tasks needs to be reset on each call to this method.
    if (this.dequeueTasks.IsAddingCompleted)
    {
        this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList);
    }

    for (int i = 0; i < threadCount; i++)
    {
        CancellationToken cancellationToken = this.cancellationSignal.Token;
        CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage);

        // Start a new dequeue task and register it in the collection of tasks internally managed by this component.
        this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
    }

    // Mark this collection as not accepting any more additions.
    this.dequeueTasks.CompleteAdding();
}

DequeueTaskMain 方法实现某一取消排队线程的功能主体。其主要操作如下所示:

/// <summary>
/// Implements a task performing dequeue operations against a given Windows Azure queue.
/// </summary>
/// <param name="state">An object containing data to be used by the task.</param>
private void DequeueTaskMain(object state)
{
    CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

    int idleStateCount = 0;
    TimeSpan sleepInterval = DequeueInterval;

    try
    {
        // Run a dequeue task until asked to terminate or until a break condition is encountered.
        while (workerState.CanRun)
        {
            try
            {
                var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                int messageCount = 0;

                // Process the dequeued messages concurrently by taking advantage of the above PLINQ query.
                queueMessages.ForAll((message) =>
                {
                    // Reset the count of idle iterations.
                    idleStateCount = 0;

                    // Notify all subscribers that a new message requires processing.
                    workerState.OnNext(message);

                    // Once successful, remove the processed message from the queue.
                    workerState.QueueStorage.Delete<T>(message);

                    // Increment the number of processed messages.
                    messageCount++;
                });

                // Check whether or not we have done any work during this iteration.
                if (0 == messageCount)
                {
                    // Increment the number of iterations when we were not doing any work (e.g. no messages were dequeued).
                    idleStateCount++;

                    // Call the user-defined delegate informing that no more work is available.
                    if (QueueEmpty != null)
                    {
                        // Check if the user-defined delegate has requested a halt to any further work processing.
                        if (QueueEmpty(this, idleStateCount, out sleepInterval))
                        {
                            // Terminate the dequeue loop if user-defined delegate advised us to do so.
                            break;
                        }
                    }

                    // Enter the idle state for the defined interval.
                    Thread.Sleep(sleepInterval);
                }
            }
            catch (Exception ex)
            {
                if (ex is OperationCanceledException)
                {
                    throw;
                }
                else
                {
                    // Offload the responsibility for handling or reporting the error to the external object.
                    workerState.OnError(ex);

                    // Sleep for the specified interval to avoid a flood of errors.
                    Thread.Sleep(sleepInterval);
                }
            }
        }
    }
    finally
    {
        workerState.OnCompleted();
    }
}

在实现 DequeueTaskMain 方法时有几点需要特别注意。

首先,在将消息调度为进行处理时我们利用并行 LINQ (PLINQ)。在这里使用 PLINQ 的主要好处是通过尽可能并行对多个处理器上单独的工作线程执行查询委托,加快消息处理速度。

note注意
因为查询并行度由 PLINQ 在内部进行管理,所以,不能保证 PLINQ 将使用多个内核来用于工作并行度。如果 PLINQ 确定并行的系统开销将会减慢查询速度,则可以按顺序运行查询。为了从 PLINQ 中受益,查询中的总工作量必须足够大,这样才能从计划线程池上的工作的系统开销中得到好处。

接下来,我们将不是一次提取一个消息。而是请求队列服务 API 从队列中检索特定数目的消息。这受到传递到 Get<T> 方法的 DequeueBatchSize 参数的驱动。在我们进入作为整体解决方案的一部分实现的存储抽象层时,此参数将传递给队列服务 API 方法。此外,我们运行一个安全检查以便确保批次大小未超过 API 支持的最大大小。下面的代码将实现上述操作:

/// This class provides reliable generics-aware access to the Windows Azure Queue storage.
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    /// The maximum batch size supported by Queue Service API in a single Get operation.
    private const int MaxDequeueMessageCount = 32;

    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
        Guard.ArgumentNotZeroOrNegativeValue(count, "count");

        try
        {
            var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

            IEnumerable<CloudQueueMessage> queueMessages = this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() =>
            {
                return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout);
            });

            // ... There is more code after this point ...

最后,我们将不会无限期地运行取消排队任务。我们设置了一个作为 QueueEmpty 事件实现的显式检查点,只要队列变为空,就会引发该事件。此时,我们参考 QueueEmpty 事件处理程序,以便确定是否允许我们完成正在运行的取消排队任务。通过在实现 QueueEmpty 事件处理程序时很好地进行设计,可支持下一个部分中介绍的“自动缩减”功能。

自动缩减取消排队任务

QueueEmpty 事件处理程序有两个用途。首先,它负责向源取消排队任务提供反馈,指示它对于某一给定时间间隔(在事件委托的 delay 输出参数中定义)进入睡眠状态。其次,它指示取消排队任务是否必须适当地关闭自己(由 Boolean 返回参数指定)。

QueueEmpty 事件处理程序的以下实现方式解决了在本白皮书中前面着重介绍的两个挑战。它计算随机指数退让间隔并且指示取消排队任务以指数级增加队列轮询请求之间的延迟。请注意,该退让延迟将不会超过在我们的解决方案中配置的 1 秒钟,因为在很好地实现了自动缩放后,在轮询之间实在无需长延迟。此外,它查询队列侦听器状态以便确定处于活动状态的取消排队任务的数目。如果此数目超过 1,则事件处理程序将会建议源取消排队任务完成其轮询循环,只要退让间隔也到达了其指定的最大值。否则,取消排队任务将不会被终止,并且对于队列侦听器的单个实例,保持一次恰好运行一个轮询线程。如前面所述,此方法有助于减少存储事务的数目并因此降低事务成本

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
    ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

    // Find out which extension is responsible for retrieving the worker role configuration settings.
    IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>();

    // Get the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo queueServiceState = queueService.QueryState();

    // Set up the initial parameters, read configuration settings.
    int deltaBackoffMs = 100;
    int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds);
    int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds);

    // Calculate a new sleep interval value that will follow a random exponential back-off curve.
    int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
    int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);

    // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
    delay = TimeSpan.FromMilliseconds((double)interval);

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself
    // unless this is a last deqeueue task. If so, we are not going to keep it running and continue polling the queue.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1;
}

也可以从更高的层级按如下所示说明上述“取消排队任务缩减”功能:

  1. 在队列中存在任何内容时,取消排队任务将确保尽可能快地处理工作负荷。在从队列请求取消排队消息之间将没有任何延迟。

  2. 只要源队列变为空,每个取消排队任务就将引发 QueueEmpty 事件。

  3. QueueEmpty 事件处理程序将计算随机指数退让延迟,并且指示取消排队任务在给定时间间隔中挂起其活动。

  4. 取消排队任务将继续按照计算出的时间间隔轮询源队列,直到空闲持续时间超出其允许的最大值。

  5. 在到达最大空闲时间间隔后,只要源队列仍为空,则所有处于活动状态的取消排队任务都将适当地开始关闭自己 – 不会所有任务同时都关闭自己,因为在退让算法中,取消排队任务在不同时间点退让。

  6. 在某个时间点上,将只有一个处于活动状态的取消排队任务等待工作。因此,将不会对队列发生空闲轮询事务,除非由单个任务进行。

为了阐明收集时间点负荷特性的过程,有必要提及相关的源代码项目。首先,有一个承载相关度量的结构,这些度量测量应用于解决方案的负荷的结果。出于简化目的,我们包括了一小部分将在示例代码中进一步使用的度量。

/// Implements a structure containing point-in-time load characteristics for a given queue listener.
public struct CloudQueueListenerInfo
{
    /// Returns the approximate number of items in the Windows Azure queue.
    public int CurrentQueueDepth { get; internal set; }

    /// Returns the number of dequeue tasks that are actively performing work or waiting for work.
    public int ActiveDequeueTasks { get; internal set; }

    /// Returns the maximum number of dequeue tasks that were active at a time.
    public int TotalDequeueTasks { get; internal set; }
}

其次,有一个由队列侦听器实现的方法,该方法返回其负荷度量,如下面的示例中所示:

/// Returns the current state of the queue listener to determine point-in-time load characteristics.
public CloudQueueListenerInfo QueryState()
{
    return new CloudQueueListenerInfo()
    {
        CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName),
        ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(),
        TotalDequeueTasks = this.dequeueTasks.Count
    };
}

自动增加取消排队任务

在前面的部分中,我们介绍了将处于活动状态的取消排队任务的数目减少到单个实例,以便将空闲事务对存储操作成本的影响降至最低的功能。在本部分中,我们将要演练一个相反的示例,在示例中,我们实现“自动增加”功能以便在需要时恢复处理能力。

首先,我们定义一个事件委托,它将出于触发相关操作的目的,帮助跟踪从空队列到非空队列的状态转换:

/// <summary>
/// Defines a callback delegate which will be invoked whenever new work arrived to a queue while the queue listener was idle.
/// </summary>
/// <param name="sender">The source of the event.</param>
public delegate void WorkDetectedDelegate(object sender);

然后,我们对 ICloudQueueServiceWorkerRoleExtension 接口的原始定义进行扩展,以便包括一个新事件,每次队列侦听器检测到新的工作项时,实质上是在队列深度从零更改为任何正值时,都将引发这个新事件:

public interface ICloudQueueServiceWorkerRoleExtension
{
    // ... The other interface members were omitted for brevity. See the previous code snippets for reference ...

    // Defines a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    event WorkDetectedDelegate QueueWorkDetected;
}

此外,我们确定将引发此类事件的队列侦听器代码中的正确位置。我们将要从在 DequeueTaskMain 方法中实现的取消排队循环内触发 QueueWorkDetected 事件,需要按如下所示对 DequeueTaskMain 方法进行扩展:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // An instance of the delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    public event WorkDetectedDelegate QueueWorkDetected;

    private void DequeueTaskMain(object state)
    {
        CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

        int idleStateCount = 0;
        TimeSpan sleepInterval = DequeueInterval;

        try
        {
            // Run a dequeue task until asked to terminate or until a break condition is encountered.
            while (workerState.CanRun)
            {
                try
                {
                    var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                    int messageCount = 0;

                    // Check whether or not work items arrived to a queue while the listener was idle.
                    if (idleStateCount > 0 && queueMessages.Count() > 0)
                    {
                        if (QueueWorkDetected != null)
                        {
                            QueueWorkDetected(this);
                        }
                    }

                    // ... The rest of the code was omitted for brevity. See the previous code snippets for reference ...

在上一步中,我们为 QueueWorkDetected 事件提供了一个处理程序。此事件处理程序的实现将由实例化和承载队列侦听器的组件提供。在我们的示例中,它是辅助角色。负责实例化和实现事件处理程序的代码由以下内容构成:

public class WorkItemProcessorWorkerRole : RoleEntryPoint
{
    // Called by Windows Azure to initialize the role instance.
    public override sealed bool OnStart()
    {
        // ... There is some code before this point ...

        // Instantiate a queue listener for the input queue.
        var inputQueueListener = new CloudQueueListenerExtension<XDocument>(inputQueueLocation);

        // Configure the input queue listener.
        inputQueueListener.QueueEmpty += HandleQueueEmptyEvent;
        inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent;
        inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize;
        inputQueueListener.DequeueInterval = configSettingsExtension.Settings.MinimumIdleInterval;

        // ... There is more code after this point ...
    }

    // Implements a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    private void HandleQueueWorkDetectedEvent(object sender)
    {
        // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
        ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

        // Get the current state of the queue listener to determine point-in-time load characteristics.
        CloudQueueListenerInfo queueServiceState = queueService.QueryState();

        // Determine the number of queue tasks that would be required to handle the workload in a queue given its current depth.
        int dequeueTaskCount = GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth);

        // If the dequeue task count is less than computed above, start as many dequeue tasks as needed.
        if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount)
        {
            // Start the required number of dequeue tasks.
            queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks);
        }
    }       // ... There is more code after this point ...

就上面的示例而言,GetOptimalDequeueTaskCount 方法值得更深入地进行探讨。此方法负责计算被认为最适合处理队列中的工作负荷的取消排队任务的数目。在调用时,此方法应该确定(通过任何适当的决策机制)队列侦听器为了处理正等待的或应该到达给定队列的工作量需要多大的“能力”。

例如,开发人员可以执行一个简单的方法并且将一组静态规则直接嵌入 GetOptimalDequeueTaskCount 方法中。使用队列基础结构的已知吞吐量和可伸缩性特性、平均处理延迟、负载大小和其他相关输入,规则集可以采用开放式视图并且决定最佳的取消排队任务计数。

在下面的示例中,有意使用了一个非常简化的技术来确定取消排队任务的数目:

/// <summary>
/// Returns the number of queue tasks that would be required to handle the workload in a queue given its current depth.
/// </summary>
/// <param name="currentDepth">The approximate number of items in the queue.</param>
/// <returns>The optimal number of dequeue tasks.</returns>
private int GetOptimalDequeueTaskCount(int currentDepth)
{
    if (currentDepth < 100) return 10;
    if (currentDepth >= 100 && currentDepth < 1000) return 50;
    if (currentDepth >= 1000) return 100;

    // Return the minimum acceptable count.
    return 1;
}

要特别申明的是,上面的代码示例不是要成为“放之四海而皆准的”方法。更理想的解决方案是调用一个可从外部配置和管理的规则,该规则将执行所需的计算。

此时,我们具有队列侦听器的一个工作原型,它能够根据波动的工作负荷自动增减自身。作为最后的工作,可能需要通过在处理时调整自己以适应变化的负荷的能力来增强它。可以通过应用与在添加对 QueueWorkDetected 事件的支持时遵循的相同模式来添加此功能。

现在,我们将重心转到另一个重要的优化,它将帮助减少队列侦听器中的延迟。

为零延迟取消排队实现发布/订阅层

在本部分中,我们将要使用在 Service Bus 单向多播功能基础上建立的基于推送的通知机制来增强上述队列侦听器的实现。该通知机制负责触发一个事件,该事件指示队列侦听器开始执行取消排队工作。此方法有助于避免轮询队列以便检查是否有新消息并因此消除了关联的延迟

首先,我们定义一个触发器事件,在新工作负荷存入队列时我们的队列侦听器将接收这个触发器事件:

/// Implements a trigger event indicating that a new workload was put in a queue.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class CloudQueueWorkDetectedTriggerEvent
{
    /// Returns the name of the storage account on which the queue is located.
    [DataMember]
    public string StorageAccount { get; private set; }

    /// Returns a name of the queue where the payload was put.
    [DataMember]
    public string QueueName { get; private set; }

    /// Returns a size of the queue's payload (e.g. the size of a message or the number of messages in a batch).
    [DataMember]
    public long PayloadSize { get; private set; }

    // ... The constructor was omitted for brevity ...
}

接下来,我们将使队列侦听器实现能够充当接收触发器事件的订阅服务器。第一步是将队列侦听器定义为 CloudQueueWorkDetectedTriggerEvent 事件的观察者:

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent>
{
    // ... The body is omitted as it was supplied in previous examples ...
}

第二步是实现在 IObserver<T> 接口中定义的 OnNext 方法。提供程序将调用该方法以便向观察者通知有关新事件的情况:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // ... There is some code before this point ...

    /// <summary>
    /// Gets called by the provider to notify this queue listener about a new trigger event.
    /// </summary>
    /// <param name="e">The trigger event indicating that a new payload was put in a queue.</param>
    public void OnNext(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Make sure the trigger event is for the queue managed by this listener, otherwise ignore.
        if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName)
        {
            if (QueueWorkDetected != null)
            {
                 QueueWorkDetected(this);
            }
        }
    }

    // ... There is more code after this point ...
}

如上面的示例中所示,我们有意调用了与在前面的步骤中使用的相同事件委托。QueueWorkDetected 事件处理程序已为实例化取消排队任务的最佳数目提供了必需的应用程序逻辑。因此,在处理 CloudQueueWorkDetectedTriggerEvent 通知时重复使用相同的事件处理程序。

如前面的部分中所述,我们在采用基于推送的通知时不必维护连续运行的取消排队任务。因此,我们可以将每个队列侦听器实例的队列任务数目减少到零,并且使用通知机制在队列接收到工作项时实例化取消排队任务。为了确保我们未运行任何空闲的取消排队任务,需要在 QueueEmpty 事件处理程序中进行以下简单修改:

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // ... There is some code before this point ...

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs;
}

总之,我们不再检测是否剩余单个处于活动状态的取消排队任务。修正的 QueueEmpty 事件处理程序的结果只考虑超出最大空闲间隔(此时,所有处于活动状态的取消排队任务都将关闭)的情况。

为了接收 CloudQueueWorkDetectedTriggerEvent 通知,我们利用了作为在 Windows Azure 角色实例之间松散耦合的消息传递实现的发布/订阅模型。实质上,我们挂钩了相同的角色间通信层并且按如下所示处理传入事件:

public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension
{
    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    public void OnNext(InterRoleCommunicationEvent e)
    {
        if (this.owner != null && e.Payload != null)
        {
            // ... There is some code before this point ...

            if (e.Payload is CloudQueueWorkDetectedTriggerEvent)
            {
                HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent);
                return;
            }

            // ... There is more code after this point ...
        }
    }

    private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Enumerate through registered queue listeners and relay the trigger event to them.
        foreach (var queueService in this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>())
        {
            // Pass the trigger event to a given queue listener.
            queueService.OnNext(e);
        }
    }
}

对在 CloudQueueWorkDetectedTriggerEvent 类中定义的触发器事件进行多播最终由发布服务器(也就是说,在队列上存储工作项的组件)负责进行。可以在将第一个工作项排入队列之前或者在将最后一个工作项置于队列中之后触发此事件。在下面的示例中,我们在完成将工作项放置于输入队列中后发布一个触发器事件:

public class ProcessInitiatorWorkerRole : RoleEntryPoint
{
    // The instance of the role extension which provides an interface to the inter-role communication service.
    private volatile IInterRoleCommunicationExtension interRoleCommunicator;

    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    private void HandleWorkload()
    {
        // Step 1: Receive compute-intensive workload.
        // ... (code was omitted for brevity) ...

        // Step 2: Enqueue work items into the input queue.
        // ... (code was omitted for brevity) ...

        // Step 3: Notify the respective queue listeners that they should expect work to arrive.
        // Create a trigger event referencing the queue into which we have just put work items.
        var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue");

        // Package the trigger into an inter-role communication event.
        var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger);

        // Publish inter-role communication event via the Service Bus one-way multicast.
        interRoleCommunicator.Publish(interRoleEvent);
    }
}

现在,我们已构建了一个队列侦听器,它能够支持多线程、自动缩放和基于推送的通知,此时该是在 Windows Azure 平台上整合属于基于队列的消息传递解决方案设计的所有建议的时候了。

结论

为了最大限度地提高在 Windows Azure 平台上运行的基于队列的消息传递解决方案的效率和成本效益,解决方案架构师和开发人员应该考虑以下建议。

作为解决方案架构师,您应该:

  • 设置一个基于队列的消息传递体系结构,该体系结构使用 Windows Azure 队列存储服务用于基于云的解决方案或混合解决方案中各层和服务之间高度可伸缩的异步通信

  • 建议分区的队列体系结构扩展超过 500 个事务/秒。

  • 理解 Windows Azure 定价模型的基础知识并且通过一系列最佳做法和设计模式优化解决方案以便降低事务成本

  • 通过设置适应变化和波动的工作负荷的体系结构来考虑动态的缩放要求。

  • 采用适当的自动缩放技术和方法灵活地增减计算能力,以便进一步优化运营支出。

  • 评估通过依赖 Windows Azure Service Bus 进行实时的基于推送的通知调度减少延迟所带来的成本效益比。

作为开发人员,您应该:

  • 设计一个消息传递解决方案,该解决方案在存储和检索来自 Windows Azure 队列的数据时使用批处理

  • 实现一个高效的队列侦听器服务,确保将在为空时按一个取消排队线程的最大数目轮询队列。

  • 当队列在较长的一个时间段内保持为空时动态减少辅助角色实例的数目

  • 实现特定于应用程序的随机指数退让算法,以便降低空闲队列轮询对存储事务成本的影响。

  • 在实现高度多线程的多实例队列发布服务器和使用者时,采用阻止超出单个队列的可伸缩性目标的正确技术。

  • 采用能够在发布和使用来自 Windows Azure 队列的数据时处理多种瞬时条件的强健的重试策略

  • 使用 Windows Azure Service Bus 提供的单向事件处理功能来支持基于推送的通知,以便缩短延迟并提高基于队列的消息传递解决方案的性能。

  • 利用 .NET Framework 4 的新功能(例如 TPL、PLINQ 和 Observer 模式)来最大限度地增加并行度、提高并发性并且简化多线程服务的设计。

随附的示例代码可从 MSDN 代码库下载。该示例代码还包括所有要求的基础结构组件,例如针对 Windows Azure 队列服务的识别泛型的抽象层,它在上面的代码段中未提供。请注意,所有源代码文件均受到相应法律声明中所述的 Microsoft 公共许可证的管辖。

其他资源/参考

有关本白皮书中所论述主题的详细信息,请参考以下内容:


生成日期:

2013-10-23

社区附加资源

显示:
© 2014 Microsoft