Exportar (0) Imprimir
Expandir Tudo

Práticas recomendadas para maximizar a escalabilidade e a relação custo-benefício de soluções de mensagens baseadas em fila no Windows Azure

Atualizado: julho de 2014

Autor: Valery Mizonov

Revisores: Brad Calder, Sidney Higa, Christian Martinez, Steve Marx, Curt Peterson, Paolo Salvatori e Trace Young

Este artigo oferece diretrizes prescritivas e práticas recomendadas para a criação de soluções de mensagens baseadas em fila escalonáveis, econômicas e altamente eficientes na plataforma Windows Azure. O público-alvo deste artigo inclui arquitetos e desenvolvedores de soluções que criam e implementam soluções baseadas em nuvem que utilizam os serviços de armazenamento em fila da plataforma Windows Azure.

Uma solução tradicional de mensagens baseada em fila utiliza o conceito de um local de armazenamento de mensagens conhecido como fila de mensagens, que é um repositório para os dados que serão enviados a um ou mais participantes ou recebidos deles, normalmente por meio de um mecanismo de comunicação assíncrona.

A troca de dados baseada em fila representa a base de uma arquitetura de mensagens altamente escalonável e confiável, capaz de dar suporte a uma série de cenários avançados no Distributed Computing Environment. Não importa se são mensagens duráveis ou de expedição de trabalho de alto volume, pois uma tecnologia de serviço de enfileiramento de mensagens pode intervir e fornecer recursos de primeira classe para atender aos diferentes requisitos de comunicação assíncrona em escala.

A finalidade deste artigo é examinar como os desenvolvedores podem se beneficiar de padrões específicos de design em conjunto com os recursos fornecidos pela plataforma Windows Azure para criar soluções de mensagens baseadas em fila otimizadas e econômicas. O artigo analisa em detalhes as abordagens mais comuns de implementação de interações baseadas em fila em soluções do Windows Azure, e fornece recomendações para melhorar o desempenho, aumentar a escalabilidade e reduzir despesas operacionais.

A discussão subjacente é mesclada com práticas recomendadas, dicas e recomendações relevantes quando apropriado. O cenário descrito neste artigo realça uma implementação técnica baseada em um projeto de um cliente real.

Para a finalidade de um exemplo concreto, generalizaremos um cenário de cliente real como a seguir.

Um provedor de soluções SaaS inicia um novo sistema de cobrança implementado como um aplicativo do Windows Azure que atende às necessidades empresariais de processamento de transações de clientes em escala. A premissa básica da solução é centralizada na capacidade de descarregar a carga de trabalho de computação intensa para a nuvem e aproveitar a elasticidade da infraestrutura do Windows Azure para executar esse trabalho.

O elemento local da arquitetura ponta a ponta consolida e expede regularmente grandes volumes de transações para um serviço hospedado do Windows Azure ao longo do dia. Os volumes variam de alguns milhares a centenas de milhares de transações por envio, atingindo milhões de transações por dia. Além disso, suponha que a solução deva satisfazer um requisito controlado por SLA para uma latência de processamento máxima garantida.

A arquitetura da solução é baseada no padrão de design de redução de mapa distribuído, e é composta de uma camada em nuvem baseada em função de trabalho de várias instâncias que usa o armazenamento de filas do Windows Azure para expedição de trabalho. Os lotes de transações são recebidos pela instância de função de trabalho Iniciador de Processo, desmembrados (desfeitos os lotes) em itens de trabalho menores e enfileirados em uma coleção de filas do Windows Azure para fins de distribuição de carga.

O processamento de carga de trabalho é tratado por várias instâncias da função de trabalho de processamento buscando itens de trabalho das filas e passando-os por meio de procedimentos computacionais. As instâncias de processamento utilizam ouvintes de fila multi-threaded para implementar o processamento de dados paralelo a fim de obter o desempenho ideal.

Os itens de trabalho processados são encaminhados para uma fila dedicada, da qual são removidos pela instância de função de trabalho Controlador de Processo, agregados e mantidos em um repositório de dados para mineração de dados, relatório e análise.

A arquitetura da solução pode ser descrita como a seguir:

O diagrama acima descreve uma arquitetura comum para expandir cargas de trabalho de computação grandes ou complexas. O padrão de troca de mensagens baseado em fila adotado por essa arquitetura também é muito comum para muitos outros aplicativos e serviços do Windows Azure que precisam se comunicar entre si por meio de filas. Isso permite utilizar uma abordagem canônica para examinar os componentes específicos fundamentais envolvidos em uma troca de mensagens baseada em fila.

Uma solução comum de mensagens que troca dados entre os componentes distribuídos usando filas de mensagens inclui publicadores depositando mensagens nas filas, e um ou mais assinantes que devem receber essas mensagens. Na maioria dos casos, os assinantes, às vezes chamados de ouvintes de fila, são implementados como processos únicos ou multi-threaded, continuamente em execução ou iniciados sob demanda de acordo com um padrão de agendamento.

Em um nível mais alto, há dois mecanismos primários de expedição usados para habilitar um ouvinte de fila a receber mensagens armazenadas em uma fila:

  • Sondagem (modelo baseado em pull): um ouvinte monitora uma fila verificando-a em intervalos regulares para ver se há novas mensagens. Quando a fila estiver vazia, o ouvinte continuará a sondá-la, parando periodicamente ao entrar em um estado de suspensão.

  • Disparo (modelo baseado em push): um ouvinte assina um evento que é disparado (pelo próprio publicador ou por um gerente de serviços de fila) sempre que uma mensagem chega a uma fila. O ouvinte, por sua vez, pode iniciar o processamento da mensagem sem precisar sondar a fila para determinar se algum novo trabalho está disponível.

Também é válido mencionar que há diferentes características em ambos os mecanismos. Por exemplo, a sondagem pode ser com bloqueio e sem bloqueio. O bloqueio mantém uma solicitação em espera até uma nova mensagem aparecer em uma fila (ou o tempo limite ser atingido), enquanto uma solicitação sem bloqueio é concluída imediatamente caso não haja nada em uma fila. Com um modelo de disparo, uma notificação pode ser enviada por push aos ouvintes de fila para cada nova mensagem somente quando a primeira mensagem chega a uma fila vazia ou quando a profundidade da fila atinge um determinado nível.

noteObservação
As operações de remoção da fila com suporte da API do Serviço de Fila do Windows Azure são sem bloqueio. Isso significa que métodos da API, como GetMessage ou GetMessages, retornarão imediatamente se nenhuma mensagem for encontrada em uma fila. Por outro lado, as filas do Windows Azure Service Bus oferecem operações de recebimento que bloqueiam o thread de chamada até uma mensagem chegar a uma fila ou um determinado tempo limite expirar.

A abordagem mais comum atualmente para implementar ouvintes de fila em soluções do Windows Azure pode ser resumida como a seguir:

  1. Um ouvinte é implementado como um componente de aplicativo que é instanciado e executado como parte de uma instância de função de trabalho.

  2. O ciclo de vida do componente de ouvinte de fila geralmente estaria associado ao tempo de execução da instância de função de hospedagem.

  3. A lógica de processamento principal é composta de um loop no qual as mensagens são removidas da fila e expedidas para processamento.

  4. Se nenhuma mensagem for recebida, o thread de escuta entrará em um estado de suspensão cuja duração geralmente é controlada por um algoritmo de retirada específico ao aplicativo.

  5. O loop de recebimento é executado e uma fila é sondada até que o ouvinte seja notificado para sair do loop e encerrar.

O fluxograma a seguir descreve a lógica normalmente usada ao implementar um ouvinte de fila com um mecanismo de sondagem em aplicativos do Windows Azure:

Best-Practices-Messaging-Solutions-Azure2
noteObservação
Para a finalidade deste artigo, não são usados padrões mais complexos de design, por exemplo, aqueles que exigem o uso de um gerenciador de fila central (agente).

O uso de um ouvinte de fila clássico com um mecanismo de sondagem pode não ser a escolha ideal ao usar filas do Windows Azure, pois o modelo de preços do Windows Azure mede as transações de armazenamento em termos de solicitações do aplicativo realizadas na fila, independentemente de a fila estar vazia ou não. A finalidade das seções a seguir é discutir algumas técnicas para maximizar o desempenho e minimizar os custos de soluções de mensagens baseadas em fila na plataforma Windows Azure.

Nesta seção, examinaremos como melhorar os aspectos relevantes de design para obter maior desempenho, melhor escalabilidade e redução de custos.

Talvez a maneira mais fácil de qualificar um padrão de implementação como “solução mais eficiente” seja por meio do design que atende aos seguintes objetivos:

  • Reduz as despesas operacionais removendo uma parte significativa das transações de armazenamento que não geram nenhum trabalho aproveitável.

  • Elimina a latência excessiva imposta por um intervalo de sondagem ao verificar se há novas mensagens em uma fila.

  • É ampliada e reduzida dinamicamente adaptando o poder de processamento aos volumes de trabalho voláteis.

O padrão de implementação também deve atender a esses objetivos sem introduzir um nível de complexidade que efetivamente supera os benefícios associados.

Ao avaliar o TCO (custo total de propriedade) e o ROI (retorno do investimento) de uma solução implantada na plataforma Windows Azure, o volume de transações de armazenamento é uma das principais variáveis na equação do TCO. Reduzir o número de transações em filas do Windows Azure reduz os custos operacionais, já que eles estão relacionados às soluções em execução no Windows Azure.

No contexto de uma solução de mensagens baseada em fila, o volume de transações de armazenamento pode ser reduzido com uma combinação dos seguintes métodos:

  1. Ao colocar mensagens em uma fila, agrupe as mensagens relacionadas em um único lote maior, compacte e armazene a imagem compactada em um armazenamento de blob e use a fila para manter uma referência ao blob que contém os dados reais.

  2. Ao recuperar mensagens de uma fila, agrupe várias mensagens em um lote em uma única transação de armazenamento. O método GetMessages na API do Serviço da Fila habilita a remoção de fila do número de mensagens especificado em uma única transação (consulte a observação abaixo).

  3. Ao verificar a presença de itens de trabalho em uma fila, evite intervalos de sondagem agressivos e implemente um atraso de retirada que aumente o tempo entre as solicitações de sondagem se uma fila permanecer continuamente vazia.

  4. Reduza o número de ouvintes de fila – ao usar um modelo baseado em pull, use somente um ouvinte de fila por instância de função quando uma fila estiver vazia. Para reduzir ainda mais o número de ouvintes de fila por instância de função a zero, use um mecanismo de notificação para instanciar ouvintes de fila quando a fila receber itens de trabalho.

  5. Se as filas permanecerem vazias na maior parte do tempo, reduza automaticamente o número de instâncias de função e continue a monitorar a métrica do sistema relevante para determinar se e quando o aplicativo deve aumentar o número de instâncias para lidar com a carga de trabalho crescente.

A maioria das recomendações acima pode ser traduzida em uma implementação razoavelmente genérica que manipula os lotes de mensagens e encapsula muitas das operações subjacentes de armazenamento de filas/blobs e gerenciamento de threads. Mais adiante neste artigo, veremos como fazer isso.

ImportantImportante
Ao recuperar mensagens pelo método GetMessages, o tamanho máximo de lote aceito pela API do Serviço de Fila em uma única operação de remoção da fila é limitado a 32.

Em linhas gerais, o custo de transações de fila do Windows Azure aumenta de forma linear conforme aumenta o número de clientes do serviço de fila, por exemplo, com o aumento do número de instâncias de função ou do número de threads de remoção da fila. Para ilustrar o impacto potencial no custo de um design de solução que não se beneficia das recomendações acima, fornecemos um exemplo baseado em números concretos.

Se o arquiteto da solução não implementar otimizações relevantes, a arquitetura do sistema de cobrança descrita acima provavelmente incorrerá em despesas operacionais excessivas, uma vez que a solução está implantada e em execução na plataforma Windows Azure. Os motivos da possível despesa excessiva são descritos nesta seção.

Conforme observado na definição do cenário, os dados de transações comerciais chegam em intervalos regulares. Entretanto, vamos supor que a solução esteja ocupada processando a carga de trabalho apenas 25% do tempo durante um dia útil padrão de 8 horas. Isso resulta em 6 horas (8 horas * 75%) do “tempo ocioso” quando não pode haver nenhuma transação chegando pelo sistema. Além disso, a solução em nenhuma hipótese receberá dados durante as 16 horas não úteis a cada dia.

Durante o período ocioso, que totaliza 22 horas, a solução ainda faz tentativas de remover trabalhos da fila, pois ela não tem um conhecimento explícito de quando chegarão novos dados. Durante essa janela de tempo, cada thread individual de remoção da fila executará até 79.200 transações (22 horas * 60 min * 60 transações/min) em uma fila de entrada, presumindo-se um intervalo de sondagem padrão de 1 segundo.

Conforme mencionado anteriormente, o modelo de preços na plataforma Windows Azure baseia-se em “transações de armazenamento” individuais. Uma transação de armazenamento é uma solicitação feita por um aplicativo de usuário para adicionar, ler, atualizar ou excluir dados de armazenamento. Quando este white paper era redigido, as transações de armazenamento eram cobradas a uma taxa de US$ 0,01 para 10.000 transações (não levando em consideração ofertas promocionais ou planos de preços especiais).

ImportantImportante
Ao calcular o número de transações de fila, tenha em mente que colocar uma única mensagem em uma fila seria contado como 1 transação, ao passo que consumir uma mensagem é geralmente um processo de 2 etapas que envolve a recuperação seguida por uma solicitação para remover a mensagem da fila. Consequentemente, uma operação bem-sucedida de remoção da fila envolverá 2 transações de armazenamento. Observe que, se uma solicitação de remoção da fila resultar em nenhuma transferência de dados, ela ainda será contada como uma transação faturável.

As transações de armazenamento geradas por um único thread de remoção da fila no cenário acima adicionarão aproximadamente US$ 2,38 (79.200 /10.000 * US$ 0,01 * 30 dias) a uma conta por mês. Em comparação, 200 threads de remoção da fila (ou, alternativamente, 1 thread de remoção da fila em 200 instâncias de função de trabalho) aumentarão o custo em US$ 457,20 por mês. Esse é o custo incorrido quando a solução não estava executando absolutamente nenhuma computação, apenas verificando as filas para ver se havia itens de trabalho disponíveis. O exemplo anterior é abstrato porque ninguém implementaria seu serviço dessa forma, e por isso é importante fazer as otimizações descritas a seguir.

Para otimizar o desempenho de soluções de mensagens do Windows Azure baseadas em fila, uma abordagem é usar a camada de mensagens de publicação/assinatura fornecida com o Windows Azure Service Bus, conforme é descrito nesta seção.

Nessa abordagem, os desenvolvedores precisarão se concentrar na criação de uma combinação de sondagem e notificações baseadas em push em tempo real, permitindo que os ouvintes assinem um evento de notificação (gatilho) que é gerado em determinadas condições para indicar que uma nova carga de trabalho é colocada em uma fila. Essa abordagem aprimora o loop de sondagem de fila tradicional com uma camada de mensagens de publicação/assinatura para expedir notificações.

Em um sistema distribuído complexo, essa abordagem necessitaria do uso de um “barramento mensagens” ou “middleware orientado a mensagens” para assegurar que as notificações possam ser retransmitidas para um ou mais assinantes de forma confiável com um acoplamento fraco. O Windows Azure Service Bus é uma opção natural para requisitos de endereçamento de mensagens entre serviços de aplicativos distribuídos com um acoplamento fraco que são executados no Windows Azure e localmente. Ele também é perfeito para uma arquitetura de “barramento de mensagens” que permitirá a troca de notificações entre processos envolvidos na comunicação baseada em fila.

Os processos envolvidos em uma troca de mensagens baseada em fila poderiam empregar o seguinte padrão:

Best-Practices-Messaging-Solutions-Azure3

Especificamente, e como eles se referem à interação entre publicadores de serviços de fila e assinantes, os mesmos princípios que se aplicam à comunicação entre instâncias de função do Windows Azure atenderiam à maioria dos requisitos para a troca de mensagens de notificação baseada em push. Nós já abordamos esses conceitos básicos em Como simplificar e dimensionar a comunicação entre funções usando o Windows Azure Service Bus.

ImportantImportante
O uso do Windows Azure Service Bus está sujeito a um modelo de preços que leva em consideração o volume das operações de mensagens em uma entidade de mensagens do Service Bus, como uma fila ou um tópico.

Portanto, é importante executar uma análise de custo-benefício para avaliar os prós e os contras de introduzir o Service Bus em uma determinada arquitetura. Ao longo dessas linhas, vale a pena avaliar se a introdução ou não da camada de expedição de notificações com base no Service Bus resultaria de fato na redução do custo que pode justificar os investimentos e os esforços de desenvolvimento adicionais.

Para obter mais informações sobre o modelo de preços do Service Bus, consulte as seções relevantes em Perguntas frequentes sobre o Windows Azure.

Quando o impacto na latência é razoavelmente fácil de ser resolvido com uma camada de mensagens de publicação/assinatura, uma redução de custo adicional pode ser percebida com o uso do dimensionamento dinâmico (elástico), conforme é descrito na próxima seção.

A plataforma Windows Azure possibilita que os clientes aumentem ou reduzam a escala com mais rapidez e facilidade do que nunca. A capacidade de adaptação a cargas de trabalho voláteis e ao tráfego variável é uma das principais propostas de valor da plataforma em nuvem. Isso significa que a “escalabilidade” não é mais um termo dispendioso do vocabulário de TI; ela é agora um recurso pronto para uso que pode ser habilitado de forma programática sob demanda em uma solução em nuvem bem-arquitetada.

Dimensionamento dinâmico é o recurso técnico que uma determinada solução tem de se adaptar a cargas de trabalho flutuantes aumentando e reduzindo a capacidade de trabalho e o poder de processamento em tempo de execução. A plataforma Windows Azure oferece suporte nativo ao dimensionamento dinâmico por meio do provisionamento de uma infraestrutura de computação distribuída, na qual é possível comprar horas de computação conforme necessário.

É importante diferenciar entre os dois seguintes tipos de dimensionamento dinâmico na plataforma Windows Azure:

  • Dimensionamento de instância de função se refere a adicionar e remover instâncias de função de trabalho e web adicionais para lidar com a carga de trabalho pontual. Isso geralmente inclui alterar a contagem de instâncias na configuração do serviço. Aumentar a contagem de instâncias fará com que o tempo de execução do Windows Azure inicie novas instâncias, ao passo que diminuir a contagem de instâncias fará com que ele desligue instâncias em execução.

  • Dimensionamento de processo (thread) refere-se à manutenção da capacidade suficiente em termos de threads de processamento em uma determinada instância de função com o ajuste do número de threads para cima e para baixo, dependendo da carga de trabalho atual.

O dimensionamento dinâmico em uma solução de mensagens baseada em fila atrairia uma combinação das seguintes recomendações gerais:

  1. Monitorar indicadores chave de desempenho, incluindo utilização de CPU, profundidade de fila, tempos de resposta e latência do processamento de mensagens.

  2. Aumentar ou diminuir dinamicamente o número de instâncias de função para lidar com os picos na carga de trabalho, previsíveis ou imprevisíveis.

  3. Expandir e reduzir o número de threads de processamento de forma programática para adaptar-se a condições de carga variáveis manipuladas por uma determinada instância de função.

  4. Particionar e processar cargas de trabalho refinadas simultaneamente usando a Biblioteca paralela de tarefas no .NET Framework 4.

  5. Manter uma capacidade viável em soluções com carga de trabalho altamente volátil em antecipação a picos repentinos, para poder lidar com eles sem a sobrecarga de configurar instâncias adicionais.

A API do Gerenciamento de Serviços possibilita que um serviço hospedado no Windows Azure modifique o número de suas instâncias de função em execução alterando a configuração de implantação em tempo de execução.

noteObservação
O número máximo de instâncias de computação pequenas do Windows Azure (ou o número equivalente de instâncias de computação de outros tamanhos em termos de número de núcleos) em uma assinatura típica é limitado a 20, por padrão. Qualquer solicitação para aumentar essa cota deve ser gerada com a equipe de Suporte do Windows Azure. Para obter mais informações, consulte as Perguntas frequentes sobre a plataforma Windows Azure.

O dimensionamento dinâmico da contagem de instâncias de função nem sempre pode ser a opção mais apropriada para lidar com picos de carga. Por exemplo, uma nova instância de função pode levar alguns segundos para ser ativada, e não há atualmente métricas de SLA fornecidas em relação à duração desse processo. Em vez disso, pode ser necessária uma solução para simplesmente aumentar o número de threads de trabalho para lidar com o aumento da carga de trabalho volátil. Enquanto a carga de trabalho é processada, a solução monitora a métrica relevante de carga e determina se precisa reduzir ou aumentar dinamicamente o número de processos de trabalho.

ImportantImportante
No momento, a meta de escalabilidade para uma única fila do Windows Azure é "restrita" a 500 transações/s. Se um aplicativo tentar exceder essa meta, por exemplo, executando operações de fila de várias instâncias de função que executam centenas de threads de remoção da fila, o resultado poderá ser a resposta HTTP 503 "Servidor ocupado" do serviço de armazenamento. Quando isso ocorre, o aplicativo deve implementar um mecanismo de repetição usando algoritmo de atraso de retirada exponencial. Entretanto, se os erros HTTP 503 estiverem ocorrendo regularmente, é recomendável usar várias filas e implementar uma estratégia baseada em particionamento para fazer o dimensionamento em várias filas.

Na maioria dos casos, o dimensionamento automático dos processos de trabalho é responsabilidade de uma instância de função individual. Por outro lado, o dimensionamento de instância de função geralmente envolve um elemento central da arquitetura de solução que é responsável por monitorar a métrica de desempenho e executar as ações de dimensionamento adequadas. O diagrama a seguir mostra um componente de serviço chamado Agente de Dimensionamento Dinâmico, que coleta e analisa as métricas de carga para determinar se precisa provisionar novas instâncias ou desativar instâncias ociosas.

Best-Practices-Messaging-Solutions-Azure4

Vale observar que o serviço do agente de dimensionamento pode ser implantado como uma função de trabalho em execução no Windows Azure ou como um serviço local. Independentemente da topologia de implantação, o serviço poderá acessar as filas do Windows Azure.

Para implementar um recurso de dimensionamento dinâmico, considere o uso do Microsoft Enterprise Library Autoscaling Application Block, que habilita o comportamento de dimensionamento automático nas soluções executadas no Windows Azure. O Autoscaling Application Block fornece toda a funcionalidade necessária para definir e monitorar o dimensionamento automático em um aplicativo do Windows Azure.

Agora que nós abordamos o impacto da latência, os custos de transações de armazenamento e os requisitos de dimensionamento dinâmico, é um bom momento para consolidar nossas recomendações em uma implementação técnica.

Nas seções anteriores, examinamos as principais características atribuídas a uma arquitetura de mensagens bem-projetada com base nas filas do armazenamento de filas do Windows Azure. Nós analisamos as três principais áreas de foco que ajudam a reduzir a latência de processamento, otimizar os custos de transações de armazenamento e melhorar a capacidade de resposta a cargas de trabalho flutuantes.

Esta seção visa fornecer um ponto de partida para ajudar desenvolvedores do Windows Azure a implementar alguns dos padrões referenciados neste white paper de uma perspectiva de programação.

noteObservação
Esta seção se concentrará na criação de um ouvinte de fila autoescalonável que dê suporte a modelos baseados em pull e push. Para obter técnicas avançadas de dimensionamento dinâmico no nível de instância de função, consulte Enterprise Library Autoscaling Application Block.

Além disso, para sermos concisos, nosso foco será apenas em alguns elementos funcionais centrais, e evitaremos a complexidade indesejada omitindo grande parte do código da infraestrutura de suporte dos exemplos de código abaixo. Para fins de esclarecimento, também é válido destacar que a implementação técnica discutida a seguir não é a única solução para um determinado problema. A finalidade é que ela seja um ponto de partida no qual os desenvolvedores possam se basear e produzir suas próprias soluções mais apuradas.

A partir daqui, o foco deste white paper será o código-fonte necessário para implementar os padrões discutidos acima.

Primeiro, definimos um contrato que será implementado por um componente de ouvinte de fila que é hospedado por uma função de trabalho e escuta em uma fila do Windows Azure.

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension
{
    /// Starts a multi-threaded queue listener that uses the specified number of dequeue threads.
    void StartListener(int threadCount);

    /// Returns the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo QueryState();

    /// Gets or sets the batch size when performing dequeue operation against a Windows Azure queue.
    int DequeueBatchSize { get; set; }

    /// Gets or sets the default interval that defines how long a queue listener will be idle for between polling a queue.
    TimeSpan DequeueInterval { get; set; }

    /// Defines a callback delegate which will be invoked whenever the queue is empty.
    event WorkCompletedDelegate QueueEmpty;
}

O evento QueueEmpty se destina a ser usado por um host. Ele fornece o mecanismo para que o host controle o comportamento do ouvinte de fila quando a fila estiver vazia. O respectivo delegado de evento é definido da seguinte forma:

/// <summary>
/// Defines a callback delegate which will be invoked whenever an unit of work has been completed and the worker is
/// requesting further instructions as to next steps.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="idleCount">The value indicating how many times the worker has been idle.</param>
/// <param name="delay">Time interval during which the worker is instructed to sleep before performing next unit of work.</param>
/// <returns>A flag indicating that the worker should stop processing any further units of work and must terminate.</returns>
public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);

Será mais fácil manipular itens da fila se um ouvinte puder operar com genéricos, em vez de usar classes do SDK “bare metal”, como CloudQueueMessage. Portanto, definimos uma nova interface que será implementada por um ouvinte de fila capaz de dar suporte ao acesso baseado em genéricos às filas:

/// <summary>
/// Defines a contract that must be supported by an extension that implements a generics-aware queue listener.
/// </summary>
/// <typeparam name="T">The type of queue item data that will be handled by the queue listener.</typeparam>
public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T>
{
}

Observe que também habilitamos o ouvinte com reconhecimento de genéricos para enviar itens da fila por push para um ou mais assinantes por meio da implementação do Padrão de design Observer utilizando a interface IObservable<T>, disponível no .NET Framework 4.

Nós pretendemos manter uma única instância de um componente implementando a interface ICloudQueueListenerExtension<T>. Entretanto, precisamos ter a possibilidade de executar vários threads de remoção da fila (processos de trabalho ou tarefas, para simplificar). Por isso, adicionamos suporte à lógica de remoção da fila multi-threaded no componente de ouvinte de fila. É aí que aproveitamos as vantagens da Biblioteca paralela de tarefas (TPL). O método StartListener será responsável por ativar o número especificado de threads de remoção da fila, como a seguir:


/// <summary>
/// Starts the specified number of dequeue tasks.
/// </summary>
/// <param name="threadCount">The number of dequeue tasks.</param>
public void StartListener(int threadCount)
{
    Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");

    // The collection of dequeue tasks needs to be reset on each call to this method.
    if (this.dequeueTasks.IsAddingCompleted)
    {
        this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList);
    }

    for (int i = 0; i < threadCount; i++)
    {
        CancellationToken cancellationToken = this.cancellationSignal.Token;
        CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage);

        // Start a new dequeue task and register it in the collection of tasks internally managed by this component.
        this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
    }

    // Mark this collection as not accepting any more additions.
    this.dequeueTasks.CompleteAdding();
}

O método DequeueTaskMain implementa o corpo funcional de um thread de remoção da fila. Suas principais operações são as seguintes:

/// <summary>
/// Implements a task performing dequeue operations against a given Windows Azure queue.
/// </summary>
/// <param name="state">An object containing data to be used by the task.</param>
private void DequeueTaskMain(object state)
{
    CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

    int idleStateCount = 0;
    TimeSpan sleepInterval = DequeueInterval;

    try
    {
        // Run a dequeue task until asked to terminate or until a break condition is encountered.
        while (workerState.CanRun)
        {
            try
            {
                var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                int messageCount = 0;

                // Process the dequeued messages concurrently by taking advantage of the above PLINQ query.
                queueMessages.ForAll((message) =>
                {
                    // Reset the count of idle iterations.
                    idleStateCount = 0;

                    // Notify all subscribers that a new message requires processing.
                    workerState.OnNext(message);

                    // Once successful, remove the processed message from the queue.
                    workerState.QueueStorage.Delete<T>(message);

                    // Increment the number of processed messages.
                    messageCount++;
                });

                // Check whether or not we have done any work during this iteration.
                if (0 == messageCount)
                {
                    // Increment the number of iterations when we were not doing any work (e.g. no messages were dequeued).
                    idleStateCount++;

                    // Call the user-defined delegate informing that no more work is available.
                    if (QueueEmpty != null)
                    {
                        // Check if the user-defined delegate has requested a halt to any further work processing.
                        if (QueueEmpty(this, idleStateCount, out sleepInterval))
                        {
                            // Terminate the dequeue loop if user-defined delegate advised us to do so.
                            break;
                        }
                    }

                    // Enter the idle state for the defined interval.
                    Thread.Sleep(sleepInterval);
                }
            }
            catch (Exception ex)
            {
                if (ex is OperationCanceledException)
                {
                    throw;
                }
                else
                {
                    // Offload the responsibility for handling or reporting the error to the external object.
                    workerState.OnError(ex);

                    // Sleep for the specified interval to avoid a flood of errors.
                    Thread.Sleep(sleepInterval);
                }
            }
        }
    }
    finally
    {
        workerState.OnCompleted();
    }
}

Vale a pena considerarmos alguns pontos em relação à implementação do método DequeueTaskMain.

Primeiro, aproveitamos os benefícios do LINQ paralelo (PLINQ) ao expedir mensagens para processamento. A principal vantagem do PLINQ aqui é acelerar a manipulação de mensagens executando o delegado de consulta em threads de trabalho separados em vários processadores em paralelo, sempre que possível.

noteObservação
Como a paralelização de consultas é gerenciada internamente pelo PLINQ, não há nenhuma garantia de que o PLINQ utilizará mais de um único núcleo para a paralelização do trabalho. O PLINQ poderá executar uma consulta em sequência, caso determine que a sobrecarga de paralelização tornará a consulta lenta. Para se beneficiar do PLINQ, o trabalho total na consulta precisa ser suficientemente grande para se beneficiar da sobrecarga de agendamento do trabalho no pool de threads.

Em segundo lugar, não estamos buscando uma única mensagem por vez. Em vez disso, pedimos que a API do Serviço de Fila recupere um número específico de mensagens de uma fila. Isso é orientado pelo parâmetro DequeueBatchSize que é passado ao método Get<T>. Quando entramos na camada de abstração de armazenamento implementada como parte da solução global, esse parâmetro é entregue ao método da API de Serviço de Fila. Além disso, executamos uma verificação de segurança para garantir que o tamanho do lote não exceda o tamanho máximo permitido pelas APIs. Isso é implementado como a seguir:

/// This class provides reliable generics-aware access to the Windows Azure Queue storage.
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    /// The maximum batch size supported by Queue Service API in a single Get operation.
    private const int MaxDequeueMessageCount = 32;

    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
        Guard.ArgumentNotZeroOrNegativeValue(count, "count");

        try
        {
            var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

            IEnumerable<CloudQueueMessage> queueMessages = this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() =>
            {
                return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout);
            });

            // ... There is more code after this point ...

Finalmente, não executaremos a tarefa de remoção da fila indefinidamente. Nós provisionamos um ponto de verificação explícito implementado como um evento QueueEmpty, que é gerado sempre que uma fila se torna vazia. Nesse ponto, consultamos um manipulador de eventos QueueEmpty para determinar se ele nos permite ou não concluir a tarefa de remoção da fila em execução. Uma implementação bem-projetada do manipulador de eventos QueueEmpty permite dar suporte ao recurso de “redução de escala automática”, como explicado na seção a seguir.

A finalidade do manipulador de eventos QueueEmpty é dupla. Primeiro, ele é responsável por fornecer comentários à tarefa de remoção da fila de origem orientando-a a entrar em um estado de suspensão durante um intervalo de tempo determinado (conforme definido no parâmetro de saída delay no delegado de evento). Em segundo lugar, ele indica à tarefa de remoção da fila se ela deve ou não desligar-se normalmente (conforme indicado pelo parâmetro de retorno Boolean).

A implementação a seguir do manipulador de eventos QueueEmpty resolve os dois desafios realçados anteriormente neste white paper. Ela calcula um intervalo de retirada exponencial aleatória e informa à tarefa de remoção da fila para aumentar exponencialmente o atraso entre solicitações de sondagem da fila. Observe que o atraso de retirada não excederá 1 segundo, conforme configurado em nossa solução, pois não é realmente necessário ter um longo atraso entre a sondagem automaticamente quando o dimensionamento automático está implementado de forma adequada. Além disso, ela consulta o estado do ouvinte de fila para determinar o número de tarefas de remoção da fila ativas. Se esse número exceder 1, o manipulador de eventos recomendará que a tarefa de remoção da fila de origem conclua seu loop de sondagem se o intervalo de retirada também tiver atingido o máximo especificado. Caso contrário, a tarefa de remoção da fila não será encerrada, deixando exatamente 1 thread de sondagem de cada vez sendo executado por uma única instância do ouvinte de fila. Essa abordagem ajuda a reduzir o número de transações de armazenamento e, portanto, diminuir os custos de transações, como explicado anteriormente.

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
    ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

    // Find out which extension is responsible for retrieving the worker role configuration settings.
    IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>();

    // Get the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo queueServiceState = queueService.QueryState();

    // Set up the initial parameters, read configuration settings.
    int deltaBackoffMs = 100;
    int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds);
    int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds);

    // Calculate a new sleep interval value that will follow a random exponential back-off curve.
    int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
    int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);

    // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
    delay = TimeSpan.FromMilliseconds((double)interval);

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself
    // unless this is a last deqeueue task. If so, we are not going to keep it running and continue polling the queue.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1;
}

Em um nível mais alto, o recurso de "redução de escala de tarefas de remoção da fila” descrito acima pode ser explicado como a seguir:

  1. Sempre que houver qualquer item na fila, as tarefas de remoção da fila garantirão que a carga de trabalho será processada o mais rápido possível. Não haverá nenhum atraso entre solicitações para remover mensagens de uma fila.

  2. Assim que a fila de origem estiver vazia, cada tarefa de remoção da fila gerará um evento QueueEmpty.

  3. O manipulador de eventos QueueEmpty calculará um atraso de retirada exponencial aleatória e instruirá a tarefa de remoção da fila a suspender sua atividade durante um determinado intervalo.

  4. As tarefas de remoção da fila continuarão a sondar a fila de origem em intervalos computados até a duração da ociosidade exceder seu máximo permitido.

  5. Ao atingir o intervalo máximo de ociosidade, e desde que a fila de origem ainda esteja vazia, todas as tarefas ativas de remoção da fila serão desligadas normalmente de forma automática – isso tudo não ocorrerá de uma só vez, já que as tarefas de remoção da fila se retiram em pontos diferentes do algoritmo de retirada.

  6. Em algum momento, haverá apenas um trabalho ativo de remoção da fila aguardando trabalho. Com isso, nenhuma transação de sondagem ociosa ocorrerá em uma fila, exceto para essa única tarefa.

Para explicar mais o processo de coleta de características de carga pontual, vale a pena mencionar os artefatos de código-fonte relevantes. Primeiro, há uma estrutura contendo a métrica relevante que mede o resultado da carga que está sendo aplicada à solução. Para fins de simplicidade, incluímos um pequeno subconjunto das métricas que serão mais usadas no código de exemplo.

/// Implements a structure containing point-in-time load characteristics for a given queue listener.
public struct CloudQueueListenerInfo
{
    /// Returns the approximate number of items in the Windows Azure queue.
    public int CurrentQueueDepth { get; internal set; }

    /// Returns the number of dequeue tasks that are actively performing work or waiting for work.
    public int ActiveDequeueTasks { get; internal set; }

    /// Returns the maximum number of dequeue tasks that were active at a time.
    public int TotalDequeueTasks { get; internal set; }
}

Em segundo lugar, há um método implementado por um ouvinte da fila que retorna suas métricas de carga conforme é representado no exemplo a seguir:

/// Returns the current state of the queue listener to determine point-in-time load characteristics.
public CloudQueueListenerInfo QueryState()
{
    return new CloudQueueListenerInfo()
    {
        CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName),
        ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(),
        TotalDequeueTasks = this.dequeueTasks.Count
    };
}

Na seção anterior, apresentamos a capacidade de reduzir o número de tarefas ativas de remoção da fila a uma única instância para minimizar o impacto de transações ociosas nos custos operacionais de armazenamento. Nesta seção, mostraremos um exemplo oposto, com o qual implementamos o recurso de “aumento de escala automático” para restaurar a capacidade de processamento quando for necessário.

Primeiro, definimos um delegado de evento que ajudará a controlar transições de estado de uma fila vazia para uma fila não vazia, para fins de disparo de ações relevantes:

/// <summary>
/// Defines a callback delegate which will be invoked whenever new work arrived to a queue while the queue listener was idle.
/// </summary>
/// <param name="sender">The source of the event.</param>
public delegate void WorkDetectedDelegate(object sender);

Em seguida, estendemos a definição original da interface ICloudQueueServiceWorkerRoleExtension para incluir um novo evento que será gerado sempre que um ouvinte de fila detectar novos itens de trabalho, basicamente quando a profundidade da fila mudar de zero para qualquer valor positivo:

public interface ICloudQueueServiceWorkerRoleExtension
{
    // ... The other interface members were omitted for brevity. See the previous code snippets for reference ...

    // Defines a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    event WorkDetectedDelegate QueueWorkDetected;
}

Além disso, determinamos o local certo no código do ouvinte de fila no qual esse evento será gerado. Acionaremos o evento QueueWorkDetected de dentro do loop de remoção da fila implementado no método DequeueTaskMain, que precisa ser estendido como a seguir:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // An instance of the delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    public event WorkDetectedDelegate QueueWorkDetected;

    private void DequeueTaskMain(object state)
    {
        CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

        int idleStateCount = 0;
        TimeSpan sleepInterval = DequeueInterval;

        try
        {
            // Run a dequeue task until asked to terminate or until a break condition is encountered.
            while (workerState.CanRun)
            {
                try
                {
                    var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                    int messageCount = 0;

                    // Check whether or not work items arrived to a queue while the listener was idle.
                    if (idleStateCount > 0 && queueMessages.Count() > 0)
                    {
                        if (QueueWorkDetected != null)
                        {
                            QueueWorkDetected(this);
                        }
                    }

                    // ... The rest of the code was omitted for brevity. See the previous code snippets for reference ...

Na última etapa, fornecemos um manipulador para o evento QueueWorkDetected. A implementação desse manipulador de eventos será fornecida por um componente que instancia e hospeda o ouvinte de fila. Em nosso caso, é uma função de trabalho. O código responsável pela criação de instância e a implementação do manipulador de eventos é composto do seguinte:

public class WorkItemProcessorWorkerRole : RoleEntryPoint
{
    // Called by Windows Azure to initialize the role instance.
    public override sealed bool OnStart()
    {
        // ... There is some code before this point ...

        // Instantiate a queue listener for the input queue.
        var inputQueueListener = new CloudQueueListenerExtension<XDocument>(inputQueueLocation);

        // Configure the input queue listener.
        inputQueueListener.QueueEmpty += HandleQueueEmptyEvent;
        inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent;
        inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize;
        inputQueueListener.DequeueInterval = configSettingsExtension.Settings.MinimumIdleInterval;

        // ... There is more code after this point ...
    }

    // Implements a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    private void HandleQueueWorkDetectedEvent(object sender)
    {
        // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
        ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

        // Get the current state of the queue listener to determine point-in-time load characteristics.
        CloudQueueListenerInfo queueServiceState = queueService.QueryState();

        // Determine the number of queue tasks that would be required to handle the workload in a queue given its current depth.
        int dequeueTaskCount = GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth);

        // If the dequeue task count is less than computed above, start as many dequeue tasks as needed.
        if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount)
        {
            // Start the required number of dequeue tasks.
            queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks);
        }
    }       // ... There is more code after this point ...

Considerando o exemplo anterior, vale a pena examinarmos o método GetOptimalDequeueTaskCount mais detalhadamente. Esse método é responsável por calcular o número de tarefas de remoção da fila que seriam consideradas ideais para controlar a carga de trabalho em uma fila. Quando chamado, ele deve determinar (por meio de alguns mecanismos adequados de tomada de decisão) a quantidade de “potência” de que o ouvinte de fila precisa para processar o volume de trabalho que está aguardando ou previsto para chegar a uma determinada fila.

Por exemplo, o desenvolvedor poderia adotar uma abordagem simplista e inserir um conjunto de regras estáticas diretamente no método GetOptimalDequeueTaskCount. Usando as características conhecidas de taxa de transferência e escalabilidade da infraestrutura da fila, latência média de processamento, tamanho da carga e outras entradas relevantes, o conjunto de regras poderia usar uma exibição otimista e tomar a decisão sobre uma contagem ideal de tarefas de remoção da fila.

No exemplo abaixo, uma técnica demasiadamente simplificada de forma intencional está sendo usada para determinar o número de tarefas de remoção da fila:

/// <summary>
/// Returns the number of queue tasks that would be required to handle the workload in a queue given its current depth.
/// </summary>
/// <param name="currentDepth">The approximate number of items in the queue.</param>
/// <returns>The optimal number of dequeue tasks.</returns>
private int GetOptimalDequeueTaskCount(int currentDepth)
{
    if (currentDepth < 100) return 10;
    if (currentDepth >= 100 && currentDepth < 1000) return 50;
    if (currentDepth >= 1000) return 100;

    // Return the minimum acceptable count.
    return 1;
}

Vale a pena reiterar que o código do exemplo anterior não pretende ser uma abordagem única para qualquer situação. Uma solução mais ideal seria invocar uma regra externamente configurável e gerenciável que realizasse as computações necessárias.

Neste ponto, temos um protótipo de trabalho de um ouvinte de fila capaz de aumentar ou reduzir sua escala automaticamente de acordo com a carga de trabalho flutuante. Talvez, como um toque final, ele precise ser enriquecido com a capacidade de adaptar-se à carga variável durante o processamento. Esse recurso pode ser adicionado aplicando-se o mesmo padrão que foi seguido ao adicionar suporte para o evento QueueWorkDetected.

Agora, vamos alternar o foco para outra otimização importante que ajudará a reduzir a latência nos ouvintes de fila.

Nesta seção, aperfeiçoaremos a implementação acima de um ouvinte de fila com um mecanismo de notificação baseado em push criado com base no recurso de multicast unidirecional do Service Bus. O mecanismo de notificação é responsável por disparar um evento que instrui o ouvinte de fila a iniciar a execução do trabalho de remoção da fila. Essa abordagem ajuda a evitar a sondagem da fila para verificar se há novas mensagens e, portanto, eliminar a latência associada.

Primeiro, definimos um evento de gatilho que será recebido por nosso ouvinte de fila no caso de uma nova carga de trabalho ser depositada em uma fila:

/// Implements a trigger event indicating that a new workload was put in a queue.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class CloudQueueWorkDetectedTriggerEvent
{
    /// Returns the name of the storage account on which the queue is located.
    [DataMember]
    public string StorageAccount { get; private set; }

    /// Returns a name of the queue where the payload was put.
    [DataMember]
    public string QueueName { get; private set; }

    /// Returns a size of the queue's payload (e.g. the size of a message or the number of messages in a batch).
    [DataMember]
    public long PayloadSize { get; private set; }

    // ... The constructor was omitted for brevity ...
}

Em seguida, habilitamos as implementações de ouvinte de fila para atuar como assinantes para receber um evento de gatilho. A primeira etapa é definir um ouvinte de fila como um observador do evento CloudQueueWorkDetectedTriggerEvent:

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent>
{
    // ... The body is omitted as it was supplied in previous examples ...
}

A segunda etapa é implementar o método OnNext definido na interface IObserver<T>. Esse método é chamado pelo provedor para notificar o observador sobre um novo evento:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // ... There is some code before this point ...

    /// <summary>
    /// Gets called by the provider to notify this queue listener about a new trigger event.
    /// </summary>
    /// <param name="e">The trigger event indicating that a new payload was put in a queue.</param>
    public void OnNext(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Make sure the trigger event is for the queue managed by this listener, otherwise ignore.
        if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName)
        {
            if (QueueWorkDetected != null)
            {
                 QueueWorkDetected(this);
            }
        }
    }

    // ... There is more code after this point ...
}

Como pode ser visto no exemplo anterior, invocamos propositadamente o mesmo delegado de evento usado nas etapas anteriores. O manipulador de eventos QueueWorkDetected já fornece a lógica de aplicativo necessária para criar uma instância do número ideal de tarefas de remoção da fila. Por isso, o mesmo manipulador de eventos é reutilizado na manipulação da notificação CloudQueueWorkDetectedTriggerEvent.

Conforme observado nas seções anteriores, não precisamos manter uma tarefa de remoção da fila continuamente em execução quando uma notificação baseada em push é utilizada. Portanto, podemos reduzir ainda mais o número de tarefas de fila por instância de ouvinte de fila a zero e usar um mecanismo de notificação para criar uma instância de tarefas de remoção da fila quando a fila receber itens de trabalho. Para ter certeza de que não estamos executando nenhuma tarefa ociosa de remoção da fila, é necessário fazer a seguinte modificação simples no manipulador de eventos QueueEmpty:

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // ... There is some code before this point ...

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs;
}

Em resumo, não estamos mais detectando se há ou não uma única tarefa ativa de remoção da fila. O resultado do manipulador de eventos QueueEmpty revisado leva em conta somente o fato de exceder o intervalo máximo de ociosidade ao término do qual todas as tarefas ativas de remoção da fila serão desligadas.

Para receber as notificações CloudQueueWorkDetectedTriggerEvent, utilizamos o modelo de publicação/assinatura que é implementado como sistema de mensagens com acoplamento fraco entre instâncias de função do Windows Azure. Basicamente, nós pegamos a mesma camada de comunicação entre funções e manipulamos os eventos de entrada como a seguir:

public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension
{
    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    public void OnNext(InterRoleCommunicationEvent e)
    {
        if (this.owner != null && e.Payload != null)
        {
            // ... There is some code before this point ...

            if (e.Payload is CloudQueueWorkDetectedTriggerEvent)
            {
                HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent);
                return;
            }

            // ... There is more code after this point ...
        }
    }

    private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Enumerate through registered queue listeners and relay the trigger event to them.
        foreach (var queueService in this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>())
        {
            // Pass the trigger event to a given queue listener.
            queueService.OnNext(e);
        }
    }
}

O multicast de um evento de gatilho definido na classe CloudQueueWorkDetectedTriggerEvent é de responsabilidade final de um publicador, isto é, o componente que deposita itens de trabalho em uma fila. Esse evento pode ser disparado antes de o primeiro item de trabalho ser enfileirado ou após o último item ser colocado em uma fila. No exemplo a seguir, publicamos um evento de gatilho depois de colocarmos os itens de trabalho na fila de entrada:

public class ProcessInitiatorWorkerRole : RoleEntryPoint
{
    // The instance of the role extension which provides an interface to the inter-role communication service.
    private volatile IInterRoleCommunicationExtension interRoleCommunicator;

    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    private void HandleWorkload()
    {
        // Step 1: Receive compute-intensive workload.
        // ... (code was omitted for brevity) ...

        // Step 2: Enqueue work items into the input queue.
        // ... (code was omitted for brevity) ...

        // Step 3: Notify the respective queue listeners that they should expect work to arrive.
        // Create a trigger event referencing the queue into which we have just put work items.
        var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue");

        // Package the trigger into an inter-role communication event.
        var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger);

        // Publish inter-role communication event via the Service Bus one-way multicast.
        interRoleCommunicator.Publish(interRoleEvent);
    }
}

Agora que criamos um ouvinte de fila que é capaz de dar suporte a multithreading, dimensionamento automático e notificações baseadas em push, é hora de consolidar todas as recomendações que pertencem ao design de soluções de mensagens baseadas em fila na plataforma Windows Azure.

Para maximizar a eficiência e o custo-benefício de soluções de mensagens baseadas em fila em execução na plataforma Windows Azure, os arquitetos de soluções e desenvolvedores devem considerar as recomendações a seguir.

Como um arquiteto de soluções, você deve:

  • Provisionar uma arquitetura de mensagens baseada em fila que use o serviço de armazenamento em fila do Windows Azure para comunicação assíncrona de grande escala entre camadas e serviços em soluções híbridas baseadas em nuvem.

  • Recomendar a arquitetura de enfileiramento particionado de fila para dimensionar mais de 500 transações/s.

  • Entender os fundamentos do modelo de preços do Windows Azure e otimizar a solução para baixar custos de transações usando uma série de práticas recomendadas e padrões de design.

  • Considerar os requisitos de dimensionamento dinâmico provisionando uma arquitetura que seja adaptável a cargas de trabalho voláteis e flutuantes.

  • Usar as técnicas e abordagens de dimensionamento automático para expandir e reduzir de forma elástica a capacidade de computação para otimizar ainda mais a despesa operacional.

  • Avaliar a relação custo-benefício de reduzir a latência usando a dependência no Windows Azure Service Bus para a expedição de notificação baseada em push em tempo real.

Como desenvolvedor, você deve:

  • Criar uma solução de mensagens que utilize o envio em lote ao armazenar e recuperar dados de filas do Windows Azure.

  • Implementar um serviço de ouvinte de fila eficiente garantindo que as filas serão sondadas por um máximo de um thread de remoção da fila quando vazia.

  • Reduzir dinamicamente o número de instâncias de função de trabalho quando as filas permanecerem vazias por um longo período de tempo.

  • Implementar um algoritmo de retirada exponencial aleatória específico ao aplicativo para reduzir o efeito da sondagem de fila ociosa em custos de transações de armazenamento.

  • Adotar as técnicas certas que evitam que as metas de escalabilidade para uma única fila sejam excedidas na implementação de publicadores e consumidores de filas de várias instâncias altamente multi-threaded.

  • Utilizar uma política robusta de repetição capaz de manipular uma variedade de condições temporárias ao publicar e consumir dados de filas do Windows Azure.

  • Usar o recurso de eventos unidirecional fornecido pelo Windows Azure Service Bus para dar suporte a notificações baseadas em push para reduzir a latência e melhorar o desempenho da solução de mensagens baseada em fila.

  • Explorar os novos recursos do .NET Framework 4, como TPL, PLINQ e o padrão Observer para maximizar o grau de paralelismo, melhorar a simultaneidade e simplificar o design de serviços multi-threaded.

O exemplo de código associado está disponível para download na galeria de códigos do MSDN. O código de exemplo também inclui todos os componentes de infraestrutura necessários, como a camada de abstração com reconhecimento de genéricos para o serviço de filas do Windows Azure, que não foram fornecidos nos trechos de código acima. Observe que todos os arquivos de código-fonte são regidos pela licença pública da Microsoft, como explicado nos avisos legais correspondentes.

Para obter mais informações sobre o tópico discutido neste white paper, consulte o seguinte:

Mostrar:
© 2014 Microsoft