导出 (0) 打印
全部展开

Service Bus 队列、主题和订阅

更新时间: 2014年10月

Microsoft Azure Service Bus 支持一组基于云且面向消息的中间件技术,包括可靠消息队列和持久发布/订阅消息。这些“中转”消息传送功能可以看作是异步或分离的消息传送功能,支持使用Service Bus消息结构的发布-订阅、临时分离和负载平衡方案。分离的通信具有很多优点,例如,客户端和服务器可根据需要进行连接并可通过异步方式执行操作。

在 Service Bus 中构成中转消息传送功能核心的消息传送实体为队列主题/订阅规则/操作事件中心

队列为一个或多个竞争的使用者提供“先进先出 (FIFO)”消息传送方式。也就是说,消息通常按其加入队列的临时顺序由接收方接收和处理,并且每条消息只能由一个消息使用者接收和处理。使用队列的主要优点是实现了应用程序组件的“临时分离”。换句话说,生成者(发送方)和使用者(接收方)不需要同时发送和接收消息,因为消息已持久存储在队列中。而且,生成者无需等待使用者的答复,即可继续处理和发送消息。

一个附带的优势是“负载调配”,它允许生成者和使用者以不同速率发送和接收消息。在许多应用程序中,系统负载会随着时间的推移而变化,但每个工作单元所需的处理时间通常是不变的。使用队列调配消息生成者和使用者意味着只需将使用方应用程序设置为能够处理平均负载,而非峰值负载。当传入负载变化时,队列的深度就会相应地增加和缩小。就处理应用程序负载所需的基础结构数量而言,这将直接节省费用。当负载增加时,可以添加更多工作进程从队列中读取。每条消息仅由其中的一个工作进程处理。此外,这种基于拉取的负载平衡还可以让你以最佳方式使用工作计算机,即使工作计算机的处理能力各不相同,因为工作计算机会以自己的最大速率拉取消息。此模式通常称为“使用者竞争”模式。

使用队列在消息生成者和使用者之间调配时,将提供组件之间的固有松耦合。由于生成者和使用者彼此不知道对方,因此可以将使用者升级,而不会对生成者有任何影响。

创建队列是一个多步骤过程。Service Bus消息实体(队列和主题)的管理操作是通过 Microsoft.ServiceBus.NamespaceManager 类执行的,该类在构造时,需要提供Service Bus命名空间的基址和用户凭据。NamespaceManager 提供了创建、枚举和删除消息实体的方法。通过颁发者名称和共享机密以及服务命名空间管理对象创建 Microsoft.ServiceBus.TokenProvider 对象后,你可以使用 Microsoft.ServiceBus.NamespaceManager.CreateQueue(Microsoft.ServiceBus.Messaging.QueueDescription) 方法创建队列。例如:

// Create management credentials
TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(IssuerName, IssuerKey);
// Create namespace client
namespaceManager namespaceClient = new namespaceManager(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials);

然后,你可以将 Service Bus URI 作为参数创建队列对象和消息工厂。例如:

QueueDescription myQueue;
myQueue = namespaceClient.CreateQueue("TestQueue");
MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ServiceNamespace, string.Empty), credentials); 
QueueClient myQueueClient = factory.CreateQueueClient("TestQueue");

然后,你可以向队列发送消息。例如,如果你有一个称为 MessageList 的中转消息列表,则代码将类似于以下示例:

for (int count = 0; count < 6; count++)
{
    var issue = MessageList[count];
    issue.Label = issue.Properties["IssueTitle"].ToString();
    myQueueClient.Send(issue);
}

你可以从队列接收消息,如下所示:

while ((message = myQueueClient.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 5))) != null)
    {
        Console.WriteLine(string.Format("Message received: {0}, {1}, {2}", message.SequenceNumber, message.Label, message.MessageId));
        message.Complete();

        Console.WriteLine("Processing message (sleeping...)");
        Thread.Sleep(1000);
    }

ReceiveAndDelete 模式下,接收操作是一次性的;也就是说,当 Service Bus 收到请求时,会将消息标记为正在使用,然后将其返回给应用程序。ReceiveAndDelete 模式是最简单的模型,最适合应用程序容许在出现故障的情况下不处理消息的方案。理解此模式时,可考虑这种情况:使用者发出了接收请求,但在处理消息之前发生崩溃。由于 Service Bus 将消息标记为正在使用,因此应用程序在重新启动并重新开始使用消息时,会丢失在崩溃前已使用的消息。

PeekLock 模式下,接收操作将分为两个阶段,因此能够支持不容许丢失消息的应用程序。当 Service Bus 收到请求时,它会找到下一条要使用的消息,将其锁定以防止其他使用者接收它,然后将其返回给应用程序。应用程序完成消息处理(或将消息可靠地存储以便将来处理)后,会对收到的消息调用 Microsoft.ServiceBus.Messaging.BrokeredMessage.Complete 以完成接收过程的第二阶段。当 Service Bus 看到 Complete 时,会将该消息标记为正在使用。

如果应用程序由于某种原因无法处理消息,则可对收到的消息调用 Microsoft.ServiceBus.Messaging.BrokeredMessage.Abandon 方法(而不是 Complete)。这将使 Service Bus 可以解锁该消息,并使其重新变为可由该使用者或其他使用方应用程序接收。其次,存在与锁定关联的超时。如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),Service Bus 将解锁该消息,并使其重新变为可被接收。

请注意,如果应用程序在处理消息后崩溃,但此时尚未发出 Complete 请求,系统会在应用程序重新启动时将该消息重新传递给它。这通常称为至少一次处理,即每个消息至少会被处理一次。但在某些情况下可能会重新传递同一消息。如果方案不容许重复处理,则需要在应用程序中添加用于检测重复的逻辑,这可以基于消息的 MessageId 属性来实现,该属性在多次传递尝试过程中会保持不变。这称为恰好一次处理。

有关如何创建消息、如何向队列发送消息以及如何从队列接收消息的详细信息和工作示例,请参阅 Service Bus 中转消息传递 .NET 教程

与每条消息由单个使用者处理的队列不同,主题和订阅在“发布/订阅”模式下提供一对多的通信形式。用于扩展到大量接收方,每条已发布消息均可供已注册该主题的每个订阅使用。根据可以对每个订阅设置的筛选规则将消息发送到一个主题并传递到一个或多个相关订阅。订阅可以使用其他筛选器来限制要接收的消息。将消息发送到主题的方式与发送到队列的方式相同,但消息不是直接从主题接收,而是从订阅接收。主题订阅类似于一个虚拟队列,可接收发送到主题的消息的副本。从订阅接收消息的方式与从队列接收消息的方式相同。

通过比较,可以将队列的消息发送功能直接映射到主题,而将队列的消息接收功能映射到订阅。另外,这也意味着订阅支持本部分前面介绍的与队列相关的模式:“使用者竞争”、“临时分离”、“负载调配”和“负载平衡”。

创建主题类似于创建队列,如前一部分的示例所示。创建服务 URI,然后使用 NamespaceManager 类创建命名空间客户端。然后,你可以使用 Microsoft.ServiceBus.NamespaceManager.CreateTopic(System.String) 方法创建主题。例如:

TopicDescription dataCollectionTopic = namespaceClient.CreateTopic("DataCollectionTopic");

下一步,可根据需要添加订阅:

SubscriptionDescription myAgentSubscription = namespaceClient.CreateSubscription(myTopic.Path, "Inventory");
SubscriptionDescription myAuditSubscription = namespaceClient.CreateSubscription(myTopic.Path, "Dashboard");

然后,可创建主题客户端。例如:

MessagingFactory factory = MessagingFactory.Create(serviceUri, tokenProvider);
TopicClient myTopicClient = factory.CreateTopicClient(myTopic.Path)

使用消息发送方,可以向主题发送消息以及从主题接收消息,如前一部分所示。例如:

foreach (BrokeredMessage message in messageList)
{
    myTopicClient.Send(message);
    Console.WriteLine(
    string.Format("Message sent: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
}

与队列类似,使用 SubscriptionClient 对象(而非 QueueClient 对象)从订阅接收消息。将主题名称、订阅名称和接收模式(可选)作为参数传递,创建订阅客户端。例如,对于 Inventory 订阅:

// Create the subscription client
MessagingFactory factory = MessagingFactory.Create(serviceUri, tokenProvider); 

SubscriptionClient agentSubscriptionClient = factory.CreateSubscriptionClient("IssueTrackingTopic", "Inventory", ReceiveMode.PeekLock);
SubscriptionClient auditSubscriptionClient = factory.CreateSubscriptionClient("IssueTrackingTopic", "Dashboard", ReceiveMode.ReceiveAndDelete); 

while ((message = agentSubscriptionClient.Receive(TimeSpan.FromSeconds(5))) != null)
{
    Console.WriteLine("\nReceiving message from Inventory...");
    Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
    message.Complete();
}          

// Create a receiver using ReceiveAndDelete mode
while ((message = auditSubscriptionClient.Receive(TimeSpan.FromSeconds(5))) != null)
{
    Console.WriteLine("\nReceiving message from Dashboard...");
    Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
}

Important重要提示
如主题如何:将服务发布到 Service Bus 注册表所述,你可以使用 Microsoft.ServiceBus.ServiceRegistrySettings 来指示是否希望你的服务可被 Service Bus 发现。如果你的服务是专用的,则只有知道该特定 URI 的个人才能连接。如果你的服务是公用的,则任何人都可以在Service Bus层次结构中导航并找到你的侦听器。然而,队列、主题和订阅无法通过服务注册表来公开。

在许多方案中,具有特定特征的消息必须以特定方式进行处理。若要启用此功能,你可以将订阅配置为查找具有所需属性的消息,然后对这些属性进行特定的修改。虽然 Service Bus 订阅可看到发送到主题的所有消息,但你只能将这些消息的一个子集复制到虚拟订阅队列。此操作使用订阅筛选器来完成。此类修改称为筛选器操作。创建订阅时,你可以提供一个对消息属性进行操作的筛选器表达式,这些消息属性包括系统属性(例如,Label)和应用程序属性(例如,前面示例中的 StoreName)。在此示例中,SQL 筛选器表达式是可选的;如果没有 SQL 筛选器表达式,则将对该订阅的所有消息执行订阅中定义的所有筛选器操作。

若要使用前面的示例筛选仅由 Store1 提供的消息,则可创建 Dashboard 订阅,如下所示:

namespaceManager.CreateSubscription("IssueTrackingTopic", "Dashboard", new SqlFilter("StoreName = 'Store1'"));

设置该订阅筛选器之后,只有 StoreName 属性已设置为 Store1 的消息才会复制到 Dashboard 订阅的虚拟队列。

有关可能的筛选器值的详细信息,请参阅 Microsoft.ServiceBus.Messaging.SqlFilterMicrosoft.ServiceBus.Messaging.SqlRuleAction 类的文档。另请参阅中转消息传送:高级筛选器示例。

事件中心是一种事件引入器服务,用于向 Azure 提供大规模的事件与遥测数据入口,并且具有较低的延迟和较高的可靠性。在应用程序检测、用户体验或工作流处理以及物联网 (IoT) 方案中,将此服务与其他下游服务结合使用可以带来极好的效果。

事件中心是一个消息流式处理结构,尽管看上去与队列和主题类似,但它们具有明显不同的特征。例如,事件中心不提供消息 TTL、死信、事务或确认,因为这都属于传统的中转消息传送功能,而不是流式处理功能。事件中心提供与流相关的其他功能,例如分区、保留顺序和流重放。

另请参阅

显示:
© 2014 Microsoft