Exportar (0) Imprimir
Expandir Tudo

Como: Usar CloudFx para enviar e receber mensagens de modo assíncrono com um tópico do Service Bus

Este tópico descreve o método mais simples de enviar e receber de forma assíncrona uma mensagem de um tópico de Service Bus no Windows Azure usando a biblioteca CloudFx (estrutura de aplicativo de nuvem e extensões). O aplicativo de exemplo usa um tópico de Service Bus de forma assíncrona sem se preocupar em tratar os métodos tradicionais de início e término e outros conceitos encontrados no modelo padrão de programação assíncrona do .NET Framework. (Para obter mais informações sobre esse modelo, consulte Chamando métodos síncronos de forma assíncrona.)

Para obter a abordagem síncrona mais simples para enviar e receber uma mensagem em CloudFx usando o Service Bus, consulte How to: Use CloudFx to Send and Receive Messages with a Service Bus Topic and Subscription.

ImportantImportante
Este tópico pressupõe que você tem o Visual Studio 2012 ou posterior e já instalou a extensão de gerenciador de pacotes NuGet do Visual Studio. Caso não tenha feito isso, baixe e instale a extensão NuGet.

Configurar o projeto para a Estrutura de Destino do .NET 4.0

  1. Abra o Visual Studio executando-o como administrador, que é necessário para usar as bibliotecas .NET do Service Bus.

  2. Crie um novo aplicativo de console.

  3. Clique com o botão direito do mouse no novo projeto na solução e escolha Propriedades. Na guia Propriedades, confirme que a Estrutura de Destino está definida como .NET 4.0 e não como outra Estrutura de Destino, como mostra a imagem a seguir.

    A caixa de diálogo de alteração de destino do Visual Studio.
    noteObservação
    Se você precisar alterar a estrutura de destino, dependendo da configuração do Visual Studio, talvez precise fechar o projeto e recarregá-lo.

  4. No Gerenciador de Soluções, baixe os pacotes NuGet e adicione-os no seu projeto clicando com o botão direito do mouse no projeto e escolhendo Gerenciar Pacotes NuGet…, conforme mostrado no elemento gráfico a seguir.

    A caixa de diálogo para adicionar NuGet.
  5. Na caixa de texto de pesquisa NuGet, digite “CloudFx” e pressione ENTER. Quando aparecer o pacote CloudFx, clique em Instalar, aceite os prompts apresentados e feche a caixa de diálogo ao concluir.

    Localizando CloudFx no NuGet.

Adicionar o código

  1. Adicione os itens a seguir usando instruções do início do arquivo de programa.

    using System;
    using System.Reactive;
    using System.Reactive.Disposables;
    using Microsoft.Experience.CloudFx.Framework.Configuration;
    using Microsoft.Experience.CloudFx.Framework.Messaging;
    
    
  2. Logo após a declaração de namespace e antes da primeira classe, insira o código a seguir, que declara os tipos de mensagem NewAccountRequest e AccountClosureRequest que o programa de exemplo envia e recebe.

    noteObservação
    Os objetos desses tipos são serializáveis por padrão porque os valores de propriedade são tipos intrínsecos. Se você usar esse modelo para criar um aplicativo com tipos diferentes, lembre-se de que eles devem ser serializáveis.

    public class NewAccountRequest
    {
      public string CustomerName { get; set; }
      public int AccountNumber { get; set; }
    }
    
    public class AccountClosureRequest
    {
      public int AccountNumber { get; set; }
      public string Reason { get; set; }
    }
    
    
  3. Insira o código a seguir no método Main. O código de threading é incluído para dar uma ideia do thread onde ocorrerão várias chamadas.

    Console.WriteLine("Press ENTER to start...");
    Console.ReadLine();
    
    Console.WriteLine("Waiting on the results. When they arrive, press ENTER to exit.");
    Console.WriteLine("The console thread is {0}.", 
      System.Threading.Thread.CurrentThread.ManagedThreadId
    );
    
    
    
  4. Digite o código a seguir no método Main abaixo do código da etapa anterior. Este exemplo de código requer do seu emissor um namespace secreto e de serviço.

    noteObservação
    O código a seguir requer o Service Bus Emissor Padrão (o valor padrão é owner), Service Bus Chave Padrão, e seu namespace do serviço. Para obter informações sobre como criar um namespace de serviço, consulte Criar um namespace de serviço. Para obter informações sobre como obter a chave padrão para um namespace específico, consulte Obter as credenciais padrão para o namespace.

    // Note that the topic to which you are going to publish does 
    // not have to exist; if the topic does not exist, the topic is 
    // created automatically.
    var endpoint = new ServiceBusEndpointInfo
    {
      IssuerName = "owner", // This is the Default Issuer
      IssuerSecret = "<Service Bus Default Key>",
      ServiceNamespace = "<your namespace>",
      EndpointType = ServiceBusEndpointType.Topic,
      TopicPath = "CloudFxAsynchronousTopicSample"
    };
    
    
  5. Abaixo do código da etapa anterior, adicione o código a seguir para criar um novo ServiceBusPublishSubscribeChannel para o ponto de extremidade recém-criado. Esse objeto de canal publica mensagens em um tópico e também assina objetos do observador para escutar mensagens que chegam nesse mesmo tópico.

    // Requires "using Microsoft.Experience.CloudFx.Framework.Messaging"
    // The channel must be disposed to clean up Service Bus resources. 
    // Typically, this is done with a "using" block, but in this case, the 
    // sample program calls Dispose before closing.
    var pubSubChannel = new ServiceBusPublishSubscribeChannel(endpoint);
    
    
  6. Abaixo do código que cria o novo ServiceBusPublishSubscribeChannel, adicione o código a seguir para criar observadores que são usados para assinar mensagens desse tipo para uma assinatura específica. O ServiceBusPublishSubscribeChannel cria assinaturas necessárias automaticamente para dispensar a criação ou a associação de assinaturas com tópicos.

    Observer.Create<T> obtém uma Action do tipo T e retorna um IObserver do tipo indicado. Na biblioteca CloudFx, a implementação de ServiceBusPublishSubscribeChannel envia por push uma nova mensagem desse tipo para o observador especificado quando uma mensagem é recebida da assinatura do Service Bus.

    // Create an observer for the message types to be received. Each observer is tied to the 
    // injected message type. Observer forces the System.Reactive using statement to be added.
    var newAccObserver = Observer.Create<NewAccountRequest>(msg =>
    {
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.WriteLine("The New account observer received a NewAccountRequest on thread # {0}.",
        System.Threading.Thread.CurrentThread.ManagedThreadId
        );
      Console.WriteLine(msg.CustomerName);
      Console.WriteLine(msg.AccountNumber);
      Console.ResetColor();
    });
    
    var closeAccObserver = Observer.Create<AccountClosureRequest>(msg =>
    {
      Console.ForegroundColor = ConsoleColor.Red;
      Console.WriteLine("The Close account observer received a AccountClosureRequest on thread # {0}.",
        System.Threading.Thread.CurrentThread.ManagedThreadId
        );
      Console.WriteLine(msg.AccountNumber);
      Console.WriteLine(msg.Reason);
      Console.ResetColor();
    });
    
    
    
  7. Use o canal criado na etapa cinco para assinar os observadores para escutar esses tipos de mensagem específicos enviados ao tópico. Adicione o código a seguir abaixo do código na etapa anterior. Nesse caso, para objetos NewAccountMessage adicionamos uma contagem de ouvinte e especificamos uma abordagem de consumidor concorrente, que garante o tratamento de uma mensagem por apenas um ouvinte, embora tenhamos solicitado a disponibilização de 10 ouvintes simultâneos.

    // Give the channel the observers. The channel examines the observers, creates the appropriate
    // subscriptions to the topic, creates a filter for the message type passed as a type parameter, 
    // and creates and manages the listeners as instructed. When a message of that type is 
    // published, the appropriate observer is notified and passed the message by the CloudFx 
    // listener. Receives happen on other threads. 
    pubSubChannel.Subscribe(newAccObserver, listenerCount: 10, competing:true);
    pubSubChannel.Subscribe(closeAccObserver);
    
    
    noteObservação
    Registre os observadores para mensagens antes de enviá-las, como ocorre neste aplicativo de exemplo. A implementação de CloudFx lê mensagens assim que elas chegam e usa a funcionalidade System.Reative para enviar por push essas mensagens para os respectivos objetos que devem ser registrados previamente.

  8. Crie duas mensagens para enviar ao tópico. Cole o código a seguir abaixo do código precedente no qual você criou os observadores para receber mensagens desses tipos.

    // Create the new messages to send
    var newAcc = new NewAccountRequest()
    {
      CustomerName = "John Jones",
      AccountNumber = 111232432
    };
    
    var closure = new AccountClosureRequest()
    {
      AccountNumber = 111232432,
      Reason = "Unhappy customer"
    };
    
    
  9. Envie as mensagens e manter o console aberto até que os observadores relatem a chegada das duas mensagens. Adicione o código a seguir na parte inferior do código adicionado na etapa anterior.

    // This object will be used for tracking all pending async Publish operations. 
    // It will also be used for blocking the client until all async Publish operations complete.
    var asyncOps = new CompositeDisposable();
    
    // Create 100 messages to send. Note that although the console thread 
    // invokes each call, the call does not block, leaving the console thread
    // to continue processing without delay.
    for (int i = 0; i < 100; i++)
    {
      newAcc.AccountNumber = i;
      asyncOps.Add(pubSubChannel.PublishAsync(newAcc));
    
      Console.Write("\rPublishing the new account message on thread #{0}.", System.Threading.Thread.CurrentThread.ManagedThreadId);
    }
    
    // Publishes the close account messages asynchronously.
    
    for (int i = 0; i < 10; i++)
    {
      closure.AccountNumber = i;
      asyncOps.Add(pubSubChannel.PublishAsync(closure));
    }
    
    // This call will block the caller until all async Publish operations complete. 
    // It is useful for synchronization or simply to make sure that the client code 
    // continues only when all messages are published. Without it, the sample application
    // would need some other way of holding the console open until all messages have been published.
    asyncOps.Dispose();
    
    Console.WriteLine("Press ENTER to close the pubSubChannel...");
    Console.ReadLine();
    Console.WriteLine("Closing the pubSubChannel, it may take a few seconds, just wait a little bit...");
    
    // Dispose must be called to enable the channel to clean up resources
    // like the subscriptions and topics. 
    pubSubChannel.Dispose();
    
    Console.WriteLine("Press ENTER to exit the application.");
    Console.Read();
    
    

Executar o código

  1. Pressione F5. A saída no início do programa deve ter a aparência a seguir.

    O início do programa.
  2. A saída durante o programa deve ter a aparência a seguir.

    O final do programa.

Exemplo

Este é o arquivo de código inteiro usado para criar este exemplo.

using System;
using System.Reactive;
using System.Reactive.Disposables;
using Microsoft.Experience.CloudFx.Framework.Configuration;
using Microsoft.Experience.CloudFx.Framework.Messaging;

namespace SBAsyncTopic
{
  class Program
  {
    static void Main(string[] args)
    {
      Console.WriteLine("Press ENTER to start...");
      Console.ReadLine();

      Console.WriteLine("Waiting on the results. When they arrive, press ENTER to exit.");
      Console.WriteLine("The console thread is {0}.",
        System.Threading.Thread.CurrentThread.ManagedThreadId
      );


      // Note that the topic to which you are going to publish does 
      // not have to exist; if the topic does not exist, the topic is 
      // created automatically.
      var endpoint = new ServiceBusEndpointInfo
      {
        IssuerName = "owner", 
        IssuerSecret = "<Service Bus Default Key>",
        ServiceNamespace = "<your namespace>",
        EndpointType = ServiceBusEndpointType.Topic,
        TopicPath = "CloudFxAsynchronousTopicSample"
      };

      // Requires "using Microsoft.Experience.CloudFx.Framework.Messaging"
      // The channel must be disposed to clean up Service Bus resources. 
      // Typically, this is done with a "using" block, but in this case, the 
      // sample program calls Dispose before closing.
      var pubSubChannel = new ServiceBusPublishSubscribeChannel(endpoint);

      // Create an observer for the message types to be received. Each observer is tied to the 
      // injected message type. Observer forces the System.Reactive using statement to be added.
      var newAccObserver = Observer.Create<NewAccountRequest>(msg =>
      {
        Console.ForegroundColor = ConsoleColor.Yellow;
        Console.WriteLine("The New account observer received a NewAccountRequest on thread # {0}.",
          System.Threading.Thread.CurrentThread.ManagedThreadId
          );
        Console.WriteLine(msg.CustomerName);
        Console.WriteLine(msg.AccountNumber);
        Console.ResetColor();
      });

      var closeAccObserver = Observer.Create<AccountClosureRequest>(msg =>
      {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine("The Close account observer received a AccountClosureRequest on thread # {0}.",
          System.Threading.Thread.CurrentThread.ManagedThreadId
          );
        Console.WriteLine(msg.AccountNumber);
        Console.WriteLine(msg.Reason);
        Console.ResetColor();
      });

      // Give the channel the observers. The channel examines the observers, creates the appropriate
      // subscriptions to the topic, creates a filter for the message type passed as a type parameter, 
      // and creates and manages the listeners as instructed. When a message of that type is 
      // published, the appropriate observer is notified and passed the message by the CloudFx 
      // listener. Receives happen on other threads. 
      pubSubChannel.Subscribe(newAccObserver, listenerCount: 10, competing: true);
      pubSubChannel.Subscribe(closeAccObserver);

      // Create the new messages to send
      var newAcc = new NewAccountRequest()
      {
        CustomerName = "John Jones",
        AccountNumber = 111232432
      };

      var closure = new AccountClosureRequest()
      {
        AccountNumber = 111232432,
        Reason = "Unhappy customer"
      };

      // This object will be used for tracking all pending async Publish operations. 
      // It will also be used for blocking the client until all async Publish operations complete.
      var asyncOps = new CompositeDisposable();

      // Create 100 messages to send. Note that although the console thread 
      // invokes each call, the call does not block, leaving the console thread
      // to continue processing without delay.
      for (int i = 0; i < 100; i++)
      {
        newAcc.AccountNumber = i;
        asyncOps.Add(pubSubChannel.PublishAsync(newAcc));

        Console.Write("\rPublishing the new account message on thread #{0}.", System.Threading.Thread.CurrentThread.ManagedThreadId);
      }

      // Publishes the close account messages asynchronously.

      for (int i = 0; i < 10; i++)
      {
        closure.AccountNumber = i;
        asyncOps.Add(pubSubChannel.PublishAsync(closure));
      }

      // This call will block the caller until all async Publish operations complete. 
      // It is useful for synchronization or simply to make sure that the client code 
      // continues only when all messages are published. Without it, the sample application
// would need some other way of holding the console open until all messages have been published.

      asyncOps.Dispose();

      Console.WriteLine("Press ENTER to close the pubSubChannel...");
      Console.ReadLine();
      Console.WriteLine("Closing the pubSubChannel, it may take a few seconds, just wait a little bit...");

      // Dispose must be called to enable the channel to clean up resources
      // like the subscriptions and topics. 
      pubSubChannel.Dispose();

      Console.WriteLine("Press ENTER to exit the application.");
      Console.Read();
    }
  }

  public class NewAccountRequest
  {
    public string CustomerName { get; set; }
    public int AccountNumber { get; set; }
  }

  public class AccountClosureRequest
  {
    public int AccountNumber { get; set; }
    public string Reason { get; set; }
  }

}



Data da compilação:

2013-11-22

Contribuições da comunidade

Mostrar:
© 2014 Microsoft