导出 (0) 打印
全部展开

如何:使用 CloudFx 通过 Service Bus 主题异步收发消息

注:本页面内容可能不完全适用中国大陆地区运营的 Windows Azure服务。如要了解不同地区 Windows Azure 服务的差异, 请参考本网站.

本主题介绍使用 Cloud Application Framework & Extensions (CloudFx) 库异步向 Windows Azure 中的 Service Bus 主题发送和从其接收消息的最简单方法。示例应用程序异步地使用一个 Service Bus 主题,而不必担心处理标准 .NET Framework 异步编程模型中传统的开始和结束方法及其他概念。(有关该模型的详细信息,请参阅异步调用同步方法。)

有关使用 Service Bus 在 CloudFx 中发送和接收消息的最简单同步方法,请参阅How to: Use CloudFx to Send and Receive Messages with a Service Bus Topic and Subscription

Important重要提示
本主题假设您已安装 Visual Studio 2012 或更高版本,并且已安装 NuGet 包管理器 Visual Studio 扩展。如果尚未安装,请下载并安装 NuGet 扩展

针对 .NET 4.0 目标框架配置项目

  1. 以管理员身份打开 Visual Studio,必须这样做才能使用 Service Bus .NET 库。

  2. 新建一个控制台应用程序。

  3. 右键单击解决方案中的新项目,然后选择“属性”。在属性选项卡上,确认将“目标框架”设置为 .NET 4.0 而非任何其他目标框架,如下图所示。

    Visual Studio 目标更改对话框。
    note注意
    如果确需更改目标框架,则根据配置的不同,Visual Studio 可能需要关闭该项目再重新加载它。

  4. 在解决方案资源管理器中,通过右键单击该项目并选择“管理 NuGet 包...”,下载 NuGet 包并将这些包加入您的项目,如下图所示。

    NuGet 添加对话框。
  5. 在 NuGet 搜索文本框中,键入“CloudFx”,然后按 Enter。显示“CloudFx”包后,单击“安装”,接受随后显示的提示,完成后关闭该对话框。

    在 NuGet 中查找 CloudFx。

添加代码

  1. 将以下 using 语句添加到程序文件的开头。

    using System;
    using System.Reactive;
    using System.Reactive.Disposables;
    using Microsoft.Experience.CloudFx.Framework.Configuration;
    using Microsoft.Experience.CloudFx.Framework.Messaging;
    
    
  2. 紧接命名空间声明后,在第一个类之前插入以下代码,其中声明示例程序发送和接收的 NewAccountRequestAccountClosureRequest 消息类型。

    note注意
    默认情况下,这些类型的对象可序列化,因为其属性值为内部类型。如果使用此模型创建具有不同类型的应用程序,则请牢记,必须将这些类型序列化。

    public class NewAccountRequest
    {
      public string CustomerName { get; set; }
      public int AccountNumber { get; set; }
    }
    
    public class AccountClosureRequest
    {
      public int AccountNumber { get; set; }
      public string Reason { get; set; }
    }
    
    
  3. Main 方法中输入以下代码。将线程代码包括在内是为了指明各种调用发生在哪个线程上。

    Console.WriteLine("Press ENTER to start...");
    Console.ReadLine();
    
    Console.WriteLine("Waiting on the results. When they arrive, press ENTER to exit.");
    Console.WriteLine("The console thread is {0}.", 
      System.Threading.Thread.CurrentThread.ManagedThreadId
    );
    
    
    
  4. 在上一步添加到 Main 方法的代码的下方,输入以下代码。这段代码示例需要颁发者密钥和服务命名空间。

    note注意
    以下代码需要 Service Bus 默认颁发者(默认值为 owner)、Service Bus 默认密钥和服务命名空间。有关如何创建服务命名空间的信息,请参阅创建服务命名空间。有关如何获取特定命名空间的默认密钥的信息,请参阅获取命名空间的默认密钥

    // Note that the topic to which you are going to publish does 
    // not have to exist; if the topic does not exist, the topic is 
    // created automatically.
    var endpoint = new ServiceBusEndpointInfo
    {
      IssuerName = "owner", // This is the Default Issuer
      IssuerSecret = "<Service Bus Default Key>",
      ServiceNamespace = "<your namespace>",
      EndpointType = ServiceBusEndpointType.Topic,
      TopicPath = "CloudFxAsynchronousTopicSample"
    };
    
    
  5. 在上一步添加的代码的下方,添加以下代码,为刚创建的终结点新建一个 ServiceBusPublishSubscribeChannel。此通道对象既向主题发布消息,又订阅观察者对象以侦听到达该同一主题的消息。

    // Requires "using Microsoft.Experience.CloudFx.Framework.Messaging"
    // The channel must be disposed to clean up Service Bus resources. 
    // Typically, this is done with a "using" block, but in this case, the 
    // sample program calls Dispose before closing.
    var pubSubChannel = new ServiceBusPublishSubscribeChannel(endpoint);
    
    
  6. 在新建该 ServiceBusPublishSubscribeChannel 的代码的下方,添加以下代码以创建用于订阅特定订阅的该类型消息的观察者。ServiceBusPublishSubscribeChannel 会自动创建必要的订阅,以使您不必做创建订阅或将其与主题关联的工作。

    Observer.Create<T> 采用 T 类型的 Action,并返回给定类型的 IObserver。在 CloudFx 库中,ServiceBusPublishSubscribeChannel 的实现会在收到来自 Service Bus 订阅的消息后向指定的观察者推送一条该类型的新消息。

    // Create an observer for the message types to be received. Each observer is tied to the 
    // injected message type. Observer forces the System.Reactive using statement to be added.
    var newAccObserver = Observer.Create<NewAccountRequest>(msg =>
    {
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.WriteLine("The New account observer received a NewAccountRequest on thread # {0}.",
        System.Threading.Thread.CurrentThread.ManagedThreadId
        );
      Console.WriteLine(msg.CustomerName);
      Console.WriteLine(msg.AccountNumber);
      Console.ResetColor();
    });
    
    var closeAccObserver = Observer.Create<AccountClosureRequest>(msg =>
    {
      Console.ForegroundColor = ConsoleColor.Red;
      Console.WriteLine("The Close account observer received a AccountClosureRequest on thread # {0}.",
        System.Threading.Thread.CurrentThread.ManagedThreadId
        );
      Console.WriteLine(msg.AccountNumber);
      Console.WriteLine(msg.Reason);
      Console.ResetColor();
    });
    
    
    
  7. 使用第 5 步中创建的通道订阅您的观察者以侦听发送到主题的那些特定消息类型。紧接上一步代码的下方,添加以下代码。在本例中,对于 NewAccountMessage 对象,我们添加一个侦听器计数,并指定一种竞争使用者方法,该方法确保一个侦听器仅处理一条消息,但我们已请求提供 10 个并发侦听器。

    // Give the channel the observers. The channel examines the observers, creates the appropriate
    // subscriptions to the topic, creates a filter for the message type passed as a type parameter, 
    // and creates and manages the listeners as instructed. When a message of that type is 
    // published, the appropriate observer is notified and passed the message by the CloudFx 
    // listener. Receives happen on other threads. 
    pubSubChannel.Subscribe(newAccObserver, listenerCount: 10, competing:true);
    pubSubChannel.Subscribe(closeAccObserver);
    
    
    note注意
    在发送任何消息前注册其观察者,如同此示例应用程序所做的一样。CloudFx 实现在消息到达时读取这些消息,然后使用 System.Reactive 功能将这些消息推送到相应的已提前注册的观察者对象。

  8. 创建两条发送到主题的消息。在前面创建了观察者的代码的下方,粘贴以下代码以接收这些类型的消息。

    // Create the new messages to send
    var newAcc = new NewAccountRequest()
    {
      CustomerName = "John Jones",
      AccountNumber = 111232432
    };
    
    var closure = new AccountClosureRequest()
    {
      AccountNumber = 111232432,
      Reason = "Unhappy customer"
    };
    
    
  9. 发送消息并保持控制台打开,直到观察者报告这两条消息到达为止。将以下代码添加到上一步中添加的代码的底部。

    // This object will be used for tracking all pending async Publish operations. 
    // It will also be used for blocking the client until all async Publish operations complete.
    var asyncOps = new CompositeDisposable();
    
    // Create 100 messages to send. Note that although the console thread 
    // invokes each call, the call does not block, leaving the console thread
    // to continue processing without delay.
    for (int i = 0; i < 100; i++)
    {
      newAcc.AccountNumber = i;
      asyncOps.Add(pubSubChannel.PublishAsync(newAcc));
    
      Console.Write("\rPublishing the new account message on thread #{0}.", System.Threading.Thread.CurrentThread.ManagedThreadId);
    }
    
    // Publishes the close account messages asynchronously.
    
    for (int i = 0; i < 10; i++)
    {
      closure.AccountNumber = i;
      asyncOps.Add(pubSubChannel.PublishAsync(closure));
    }
    
    // This call will block the caller until all async Publish operations complete. 
    // It is useful for synchronization or simply to make sure that the client code 
    // continues only when all messages are published. Without it, the sample application
    // would need some other way of holding the console open until all messages have been published.
    asyncOps.Dispose();
    
    Console.WriteLine("Press ENTER to close the pubSubChannel...");
    Console.ReadLine();
    Console.WriteLine("Closing the pubSubChannel, it may take a few seconds, just wait a little bit...");
    
    // Dispose must be called to enable the channel to clean up resources
    // like the subscriptions and topics. 
    pubSubChannel.Dispose();
    
    Console.WriteLine("Press ENTER to exit the application.");
    Console.Read();
    
    

执行代码

  1. 按 F5。程序开始处的输出应类似于以下内容。

    节目的开始。
  2. 程序执行过程中的输出应类似于以下内容。

    节目的结尾。

示例

下面给出用于创建此示例的完整代码文件。

using System;
using System.Reactive;
using System.Reactive.Disposables;
using Microsoft.Experience.CloudFx.Framework.Configuration;
using Microsoft.Experience.CloudFx.Framework.Messaging;

namespace SBAsyncTopic
{
  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("Press ENTER to start...");
      Console.ReadLine();

      Console.WriteLine("Waiting on the results. When they arrive, press ENTER to exit.");
      Console.WriteLine("The console thread is {0}.",
        System.Threading.Thread.CurrentThread.ManagedThreadId
      );


      // Note that the topic to which you are going to publish does 
      // not have to exist; if the topic does not exist, the topic is 
      // created automatically.
      var endpoint = new ServiceBusEndpointInfo
      {
        IssuerName = "owner", 
        IssuerSecret = "<Service Bus Default Key>",
        ServiceNamespace = "<your namespace>",
        EndpointType = ServiceBusEndpointType.Topic,
        TopicPath = "CloudFxAsynchronousTopicSample"
      };

      // Requires "using Microsoft.Experience.CloudFx.Framework.Messaging"
      // The channel must be disposed to clean up Service Bus resources. 
      // Typically, this is done with a "using" block, but in this case, the 
      // sample program calls Dispose before closing.
      var pubSubChannel = new ServiceBusPublishSubscribeChannel(endpoint);

      // Create an observer for the message types to be received. Each observer is tied to the 
      // injected message type. Observer forces the System.Reactive using statement to be added.
      var newAccObserver = Observer.Create<NewAccountRequest>(msg =>
      {
        Console.ForegroundColor = ConsoleColor.Yellow;
        Console.WriteLine("The New account observer received a NewAccountRequest on thread # {0}.",
          System.Threading.Thread.CurrentThread.ManagedThreadId
          );
        Console.WriteLine(msg.CustomerName);
        Console.WriteLine(msg.AccountNumber);
        Console.ResetColor();
      });

      var closeAccObserver = Observer.Create<AccountClosureRequest>(msg =>
      {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine("The Close account observer received a AccountClosureRequest on thread # {0}.",
          System.Threading.Thread.CurrentThread.ManagedThreadId
          );
        Console.WriteLine(msg.AccountNumber);
        Console.WriteLine(msg.Reason);
        Console.ResetColor();
      });

      // Give the channel the observers. The channel examines the observers, creates the appropriate
      // subscriptions to the topic, creates a filter for the message type passed as a type parameter, 
      // and creates and manages the listeners as instructed. When a message of that type is 
      // published, the appropriate observer is notified and passed the message by the CloudFx 
      // listener. Receives happen on other threads. 
      pubSubChannel.Subscribe(newAccObserver, listenerCount: 10, competing: true);
      pubSubChannel.Subscribe(closeAccObserver);

      // Create the new messages to send
      var newAcc = new NewAccountRequest()
      {
        CustomerName = "John Jones",
        AccountNumber = 111232432
      };

      var closure = new AccountClosureRequest()
      {
        AccountNumber = 111232432,
        Reason = "Unhappy customer"
      };

      // This object will be used for tracking all pending async Publish operations. 
      // It will also be used for blocking the client until all async Publish operations complete.
      var asyncOps = new CompositeDisposable();

      // Create 100 messages to send. Note that although the console thread 
      // invokes each call, the call does not block, leaving the console thread
      // to continue processing without delay.
      for (int i = 0; i < 100; i++)
      {
        newAcc.AccountNumber = i;
        asyncOps.Add(pubSubChannel.PublishAsync(newAcc));

        Console.Write("\rPublishing the new account message on thread #{0}.", System.Threading.Thread.CurrentThread.ManagedThreadId);
      }

      // Publishes the close account messages asynchronously.

      for (int i = 0; i < 10; i++)
      {
        closure.AccountNumber = i;
        asyncOps.Add(pubSubChannel.PublishAsync(closure));
      }

      // This call will block the caller until all async Publish operations complete. 
      // It is useful for synchronization or simply to make sure that the client code 
      // continues only when all messages are published. Without it, the sample application
// would need some other way of holding the console open until all messages have been published.

      asyncOps.Dispose();

      Console.WriteLine("Press ENTER to close the pubSubChannel...");
      Console.ReadLine();
      Console.WriteLine("Closing the pubSubChannel, it may take a few seconds, just wait a little bit...");

      // Dispose must be called to enable the channel to clean up resources
      // like the subscriptions and topics. 
      pubSubChannel.Dispose();

      Console.WriteLine("Press ENTER to exit the application.");
      Console.Read();
    }
  }

  public class NewAccountRequest
  {
    public string CustomerName { get; set; }
    public int AccountNumber { get; set; }
  }

  public class AccountClosureRequest
  {
    public int AccountNumber { get; set; }
    public string Reason { get; set; }
  }

}



生成日期:

2013-11-22

社区附加资源

显示:
© 2014 Microsoft