深入了解 Windows Azure

Windows Azure 服务总线:使用会话的消息传送模式

Bruno Terkaly
Ricardo Villalobos

下载代码示例

Bruno Terkaly Ricardo Villalobos在我们以前的文章之一,我们讨论了以解耦的解决方案,促进不易积垢软件体系结构的乌云中使用消息处理模式的重要性。(见"比较 Windows Azure 的队列和服务总线队列"在 msdn.microsoft.com/magazine/jj159884.)队列是其中一种消息传递模式,和 Windows Azure 平台提供了两个主要选项,要实现这种方法:队列存储服务和服务总线队列,这两种涵盖多个消费者竞相接收和处理队列中的邮件的每个方案。这是典型的模型在云中,那里可以动态添加或移除接收机支持可变的工作负载基于队列,提供后端负载平衡/故障转移机制的大小 (见图 1)。

Queuing Messaging Pattern: Each Message Is Consumed by a Single Receiver
图 1 队列的消息处理模式:每个消息被消耗的单个接收器

即使该队列的消息传递模式是简单解耦的解决方案,有的情况在每个接收方需要的消息,其自身副本,与丢弃一些基于特定规则的邮件的选项。这种类型的方案中的一个好例子所示图 2,这说明了一个共同的挑战零售企业面貌,将信息发送到多个分支,如最新的产品目录或更新的价格列表时。

Publisher/Subscriber Messaging Pattern: Each Message Can Be Consumed More Than Once
图 2 发布服务器/订阅消息传递模式:每个消息可以不止一次消耗

对于这些情况下,发布服务器/订阅服务器模式是更适合,接收器只是对表示一个或多个邮件类别,感兴趣的连接到独立的订阅包含消息流的副本。Windows Azure 服务总线实现的发布服务器/订阅消息传递模式通过主题和订阅,大大增强了控制消息的分布方式的能力,基于独立规则和筛选器。在本文中,我们将解释如何应用这些 Windows Azure 服务总线功能,使用简单的真实生活场景,假设以下要求:

  1. 产品应收到订单,基于目录页中。
  2. 某些商店不要携带特定目录的类别,并在这些类别中的产品应当为每个存储筛选出。
  3. 新目录信息不应该应用于存储系统中,直到到达的所有邮件。

这篇文章的所有代码示例都创建的 Visual Studio 2012,使用 C# 作为编程语言。您还需要 Windows Azure SDK 版本 1.8 为.NET 开发人员和访问 Windows Azure 的订阅。

为项目设置了消息的蓝图

之前编写任何代码,您需要定义将成为邮件工作流的一部分的不同实体 (主题和订阅)。这可以通过访问在 Windows Azure 门户 manage.windowsazure.com。使用您的凭据登录,请按照下列步骤:

  1. 单击左下角的管理门户上的创建新的图标。
  2. 单击上的 APP 服务图标,然后在服务总线主题上和最后上创建自定义 (见图 3)。
  3. 在第一次对话框的屏幕上,输入主题名称,并选择适当地区和 Windows Azure 订阅 id。如果这是您第一次选定区域中的命名空间,该向导会建议命名空间的队列:[您的实体名称]-ns。您可以更改此值。
  4. 单击下一个标记 (右箭头) 插入其余的属性。您可以保留默认值。单击复选标记来创建主题。
  5. 单击左侧的导航栏以获取命名空间的列表中的服务总线图标。请注意您可能看不到立即列出的命名空间。它需要几秒钟才能创建命名空间和更新门户界面。
  6. 选择的主题,您刚刚创建的列表,并单击访问键,可以在屏幕的底部找到。记录供以后使用的完整的连接字符串。
  7. 在 Windows Azure 门户屏幕顶部,单击订阅,然后就创建新订阅。在弹出对话框中,输入一个名称 (在我们的示例中,我们使用"Store1Sub"),单击上的箭头继续。
  8. 在下一个屏幕中,保留默认值,但请确保检查启用会话选项。单击复选标记来创建订阅。会议将由订阅服务器用于检索中按顺序的消息。
  9. 三家店的每个重复步骤 7 和 8。

Creating a New Service Bus Topic Using the Windows Azure Portal
图 3 创建一个新的服务总线主题,使用 Windows Azure 门户

一旦创建了主题和订阅,也可以在 Visual Studio 中直接访问它们。要这样做,请打开服务器资源管理器 (视图 |服务器资源管理器中),展开 Windows Azure 服务总线节点 (见图 4)。在 Windows Azure 服务总线节点上右键单击,然后选择添加新的连接。输入 Namespace 名称、 颁发者名称 (通常"所有者") 和发行人访问键时,Windows Azure 的命名空间在门户网站中创建记录。

Creating a Service Bus Topic and Subscriptions Using the Visual Studio Tools
图 4 创建一个服务巴士主题和使用 Visual Studio 工具订阅

请记住,它是可能要以编程方式创建和管理这些实体使用 Microsoft.ServiceBus.Messaging 命名空间,包括 TopicClient 和 SubscriptionClient,用于在本文稍后部分中的类。

一旦创建了该消息的工作流的基本结构,我们将模拟使用 Visual Studio 中创建的两个控制台应用程序,如中所示的交通图 5。第一个控制台应用程序,MSDNSender,将发送产品目录。第二,MSDN­接收器,将收到的信息,在每个商店。我们要好好分析一下以下各节中的代码。在 Pub/Sub 模式中,MSDNSender 是发布服务器和 MSDNReceiver 是订阅服务器。

Visual Studio Solution to Simulate the Products Catalog Scenario
图 5 Visual Studio 解决方案来模拟产品目录方案

从总部发送产品目录

正如您看到的图 2,总部 (发布服务器) 将消息发送到某个主题。这种逻辑是由在主文件中,program.cs,然后从,MSDNSender 项目的一部分的代码表示的。Program.cs,然后从封装的逻辑和代码将主题作为单个邮件发送的产品列表。让我们看看不同的部分,从开始的 Main 方法中。请注意,我们首先创建为本专题的客户端,如下所示:

// Create a topicClient using the
// Service Bus credentials
TopicClient topicClient =
  TopicClient.CreateFromConnectionString(
  serviceBusConnectionString, topicName);

一旦创建了 topicClient,发布服务器可以发送邮件使用它。 产品要发送列表存储在 XML 文件中调用 ProductsCatalog.xml,其中包含将转化为一个对象数组的 10 产品实体的列表。 然后将获取产品映射到存储在 Product.cs 文件中的目录和产品类:

// Deserialize XML file with Products, and store them in an object array
Catalog catalog = null;
string path = "ProductsCatalog.xml";
XmlSerializer serializer = new XmlSerializer(typeof(Catalog));
StreamReader reader = new StreamReader(path);
catalog = (Catalog) serializer.Deserialize(reader);
reader.Close();

目录数组中的每个产品显示中显示的结构图 6

图 6 类表示形式在目录中的产品

public class Product
  {
    [System.Xml.Serialization.XmlElement("ProductId")]
    public string ProductId { get; set; }
    [System.Xml.Serialization.XmlElement("ProductName")]
    public string ProductName { get; set; }
    [System.Xml.Serialization.XmlElement("Category")]
    public string Category { get; set; }
    [System.Xml.Serialization.XmlElement("CatalogPage")]
    public int CatalogPage { get; set; }
    [System.Xml.Serialization.XmlElement("MSRP")]
    public double MSRP { get; set; }
    [System.Xml.Serialization.XmlElement("Store")]
    public string Store { get; set; }
  }

在数组循环中,调用 CreateMessage 方法提取物的产品对象的不同属性,并将它们分配给要发送的消息。 两个属性需要额外注意:

if (isLastProductInArray)
  message.Properties.Add("IsLastMessageInSession", "true");
message.SessionId = catalogName;

会话是极其重要的因为它们允许接收器来确定是否已到达的所有邮件,属于一个特定的逻辑组。 在这种情况下,通过设置会话 Id 的消息属性,我们指定 catalogName 值相同的所有消息都到达后接收器不应该使用直到目录信息。 另外,数组中的最后一个产品,我们增加一个新的属性:IsLastMessageInSession,这将使接收机来确定是否已到达会话中的最后一条消息和目录可以进行完全处理。 图 7 显示运行的 MSDNSender。

Execution of the MSDNSender Project
图 7 执行 MSDNSender 项目

接收在商店使用订阅产品目录

现在,目录和产品已寄出的主题,并复制到不同的订阅,让我们把注意力放到 MSDNReceiver 项目中,在接收和处理邮件。 请注意在 program.cs,然后从 Main 方法中,代码创建一个客户端订阅基于信息提供由用户通过 Console.Read­线命令。 预期用户输入其存储编号,这反映了他们希望接收的消息。 简而言之,每个分店只关注到该存储区适用的消息:

Console.WriteLine("Enter Store Number");
  string storeNumber = Console.ReadLine();
  Console.WriteLine("Selecting Subscription for Store...");
  // Create a Subscription Client to the Topic
  SubscriptionClient subscriptionClient =
    SubscriptionClient.CreateFromConnectionString(
    serviceBusConnectionString, topicName,
    "Store" + storeNumber.Trim() + "Sub",
    ReceiveMode.PeekLock);

因为我们从基于会话 (如前一节中解释) 的订阅接收消息,我们需要请求下一次使用下面的代码行:

MessageSession sessionReceiver =
  subscriptionClient.AcceptMessageSession(TimeSpan.FromSeconds(5));

基本上,这意味着客户端将检查是否有任何要处理的消息在订阅中 — — 其会话 Id 属性不为空的那些 — — 如果在五秒的时间内不遇到任何这类消息,则该请求将时间和­出了,终止接收方应用程序。 另一方面,如果找到了一个会话,则将调用 ReceivingSessionMessages 方法。 我们跳进这段代码之前,让我们讨论一下会话状态,允许开发人员将收到属于同一事务的消息时可以使用的信息存储的概念。 在这种情况下,我们使用会话状态,"记住"收到了,最后一个目录页,以及消息 — — 产品 — — 的顺序到达。

基于此,这里是在代码中的工作流:

  1. ReceiveSession 在接收当前消息­邮件方法 (见图 8),依赖的 ProcessMessage 方法 (图 9) 来处理它。
  2. 里面的 ProcessMessage 方法,如果消息是无序的它会自动延迟,并且其 ID 存储在会话状态中。 否则为它已标记为"完成",并从订阅中移除。 此外下, 一个预期序列 — — 目录页 — — 存储在会话。
  3. 目前收到的消息已被处理后,在 ReceiveSessionMessages 后面的代码在会话,延迟的邮件 Id 检查,并尝试处理他们再次基于最新的目录页。
  4. 一旦会话已经收到的所有邮件,被关闭接收器。

图 8 代码中的 ReceivedSessionMessages 方法

static void ReceiveSessionMessages(MessageSession receiver)
  {
    // Read messages from subscription until subscription is empty
    Console.WriteLine("Reading messages from subscription {0}", 
      receiver.Path);
    Console.WriteLine("Receiver Type:" + receiver.GetType().Name);
    Console.WriteLine("Receiver.SessionId = " + receiver.SessionId);
    SequenceState sessionState = GetState(receiver);
    BrokeredMessage receivedMessage;
    while ((receivedMessage = receiver.Receive()) != null)
    {
      string sessionId = receiver.SessionId;
      ProcessMessage(receivedMessage, ref sessionState, receiver);
      while (sessionState.GetNextOutOfSequenceMessage() != -1)
      {
        // Call back deferred messages
        Console.WriteLine("Calling back for deferred message: Category {0},
          Message sequence {1}", receiver.SessionId,
            sessionState.GetNextSequenceId());
        receivedMessage = receiver.Receive(
          sessionState.GetNextOutOfSequenceMessage());
        ProcessMessage(receivedMessage, ref sessionState, receiver);
      }
      if (receivedMessage.Properties.ContainsKey(
        "IsLastMessageInSession"))
        break;
    }
    SetState(receiver, null);
    receiver.Close();
  }

图 9 中的代码的 ProcessMessage 方法

static void ProcessMessage(BrokeredMessage message, ref SequenceState sessionState,
  MessageSession session = null)
  {
    if (session != null)
    {
      int messageId = Convert.ToInt32(message.Properties["CatalogPage"]);
      if (sessionState.GetNextSequenceId() == messageId)
      {
        OutputMessageInfo("RECV: ", message, "State: " + "RECEIVED");
        sessionState.SetNextSequenceId(messageId + 1);
        message.Complete();
        SetState(session, sessionState);
      }
      else
      {
        Console.WriteLine("Deferring message: Category {0}, Catalog Page {1}",
          session.SessionId, messageId);
        sessionState.AddOutOfSequenceMessage(messageId, 
          message.SequenceNumber);
        message.Defer();
        SetState(session, sessionState);
      }
    }
    Thread.Sleep(receiverDelay);
  }

请记住对于本项目,延迟的邮件 Id 存储在会话状态中,和可能会丢失。 在生产环境中,我们建议为此目的使用某种类型的持久性存储区 (Windows Azure 表是一个选项)。 请注意,如果邮件中包含的属性 IsLastMessage­SessionInSession (在发送过程中设置),终止会话循环。 在控制台输出中可以看到的 MSDNReceiver 项目图 10

Execution of the MSDNReceiver project
图 10 执行 MSDNReceiver 项目

Windows Azure 服务总线订阅给你创建筛选出邮件之前他们正在消耗的具体规则的能力。 在这种情况下,它会相对容易地创建一个规则,隔离产品按类别或商店编号 (其中我们忽略此项目中)。 直接在 Windows Azure 门户或通过 Visual Studio 工具,可以以编程方式创建规则。

总结

Windows Azure 服务总线提供发布/订阅模式惊人地强大而灵活的实现。 可以通过使用主题和订阅处理许多不同的方案。 支持多个发件人向多个接收者,加上有能力逻辑分组和排序的消息,广播消息的能力为现代开发人员开辟了一个世界的可能性。 此外,可以利用一个持久性会话跟踪状态使得简单直观,逻辑分组的消息并控制它们的顺序。 在分布式的环境在哪里规范的世界,了解如何使用消息处理模式和他们周围的工具是今天的软件架构师在云计算工作的关键。

Bruno Terkaly是 Microsoft 的开发推广人员。他的知识深度来源于多年来相关领域以及使用大量平台、语言、框架、SDK、库和 API 编写代码的经验。他不辞辛苦,就有关构建基于云的应用程序(特别是使用 Windows Azure 平台)编写代码、发布博客并给予现场演示。

Ricardo Villalobos 是一名资深的软件设计师,具有 15 年为供应链管理行业设计和创建应用程序的经验。他从达拉斯大学工商管理持有不同的技术认证,以及硕士学位,为微软工作作为 Windows Azure CSV 孵化组中云建筑师。

衷心感谢以下技术专家对本文的审阅:阿彼锡 · 拉尔