Exportar (0) Imprimir
Expandir todo
Este tema aún no ha recibido ninguna valoración - Valorar este tema

Prácticas recomendadas para maximizar la escalabilidad y la rentabilidad de las soluciones de mensajería basadas en cola en Windows Azure

Autor: Valery Mizonov

Revisores: Brad Calder, Sidney Higa, Christian Martinez, Steve Marx, Curt Peterson, Paolo Salvatori y Trace Young

Este artículo proporciona una orientación descriptiva y prácticas recomendadas para crear soluciones escalables de mensajería basadas en cola muy eficaces y rentables en la plataforma de Windows Azure. Entre los destinatarios de este artículo se incluyen los arquitectos y los desarrolladores de soluciones que diseñan e implementan soluciones basadas en la nube que desean aprovechar los servicios de almacenamiento de cola de la plataforma de Windows Azure.

Resumen

Una solución de mensajería basada en cola tradicional usa el concepto de ubicación de almacenamiento de mensajes conocida como cola de mensajes, que es un repositorio de los datos que se enviarán o se recibirán de uno o varios participantes, normalmente mediante un mecanismo de comunicación asincrónica.

El intercambio de datos basado en cola representa el fundamento de una arquitectura de mensajería muy escalable y confiable capaz de admitir diversos y eficaces escenarios en el entorno informático distribuido. Ya se trate de una mensajería perdurable o de distribución de trabajos de gran volumen, una tecnología de cola de mensajes puede proporcionar funciones de primera clase para abordar los diferentes requisitos de la comunicación asincrónica según convenga.

El propósito de este artículo es examinar el modo en que los desarrolladores pueden beneficiarse de patrones concretos de diseño junto con las funciones proporcionadas por la plataforma de Windows Azure para generar soluciones de mensajería basadas en cola optimizadas y rentables. En el artículo se profundiza en las soluciones que más se usan para implementar las interacciones basadas en cola en las soluciones de Windows Azure y se ofrecen recomendaciones para mejorar el rendimiento, aumentar la escalabilidad y reducir los gastos de explotación.

La explicación subyacente se combina con prácticas recomendadas, sugerencias y recomendaciones relacionadas, si procede. El escenario descrito en este artículo resalta una implementación técnica que se basa en un proyecto real del cliente.

Escenario de cliente

Como ejemplo concreto, generalizaremos un escenario real del cliente de la forma siguiente.

Un proveedor de soluciones SaaS lanza un nuevo sistema de facturación implementado como una aplicación de Windows Azure que atiende las necesidades empresariales del procesamiento de transacciones según convenga. La premisa clave de la solución se centra en la capacidad de descargar la enorme carga de trabajo de proceso a la nube y aprovechar la elasticidad de la infraestructura de Windows Azure para realizar dicho trabajo.

El elemento local de la arquitectura de extremo a extremo consolida los grandes volúmenes de transacciones y los envía a un servicio hospedado de Windows Azure periódicamente a lo largo del día. Los volúmenes varían de miles a centenares de miles de transacciones por cada envío, llegando a alcanzar millones de transacciones al día. Además, suponga que la solución debe satisfacer el requisito marcado por el SLA de tener una latencia máxima garantizada de procesamiento.

La arquitectura de la solución se basó en el modelo de diseño con reducción de asignaciones distribuido y consta de una capa de nube basada en roles de trabajo de varias instancias que usa el almacenamiento de cola de Windows Azure para el envío de trabajos. Los lotes de transacciones son recibidos por la instancia del rol de trabajo Iniciador de proceso, se descomponen (por lotes) en elementos de trabajo menores y se ponen en la cola en una colección de colas de Windows Azure con el fin de distribuir la carga.

El procesamiento de la carga de trabajo es administrado por varias instancias del rol de trabajo de procesamiento que capturan los elementos de trabajo de las colas y los pasan a través de procedimientos de cálculo. Las instancias de procesamiento emplean agentes de escucha de cola multiproceso para implementar el procesamiento de datos paralelo y lograr un rendimiento óptimo.

Los elementos de trabajo procesados se enrutan a una cola dedicada de la que los quita la instancia del rol de trabajo Controlador de proceso, se agregan a un almacén de datos y se mantienen allí para realizar la minería de datos, los informes y los análisis.

La arquitectura de la solución se puede describir como sigue:

Procedimientos-recomendados-Soluciones-de-mensajería-Azure1

El diagrama anterior describe una arquitectura típica para escalar cargas de trabajo de proceso grandes o complejas. El modelo de intercambio de mensajes basado en cola que esta arquitectura adopta también es muy típico de muchas otras aplicaciones y servicios de Windows Azure que necesitan comunicarse entre sí a través de colas. Esto permite adoptar un enfoque canónico para examinar componentes fundamentales concretos implicados en un intercambio de mensajes basado en cola.

Aspectos básicos de la mensajería basada en cola

Una solución típica de mensajería que intercambia datos entre sus componentes distribuidos usando colas de mensajes incluye a los publicadores que depositan los mensajes en las colas y a uno o varios suscriptores destinados a recibir los mensajes. En la mayoría de los casos, los suscriptores, en ocasiones conocidos como agentes de escucha de cola, se implementan como procesos multiproceso o de un solo proceso y se ejecutan continuamente o se inician a petición como en un modelo de programación.

En un nivel superior, se usan dos mecanismos principales de envío para permitir que un agente de escucha de cola reciba los mensajes almacenados en una cola:

  • Sondeo (modelo basado en la extracción): un agente de escucha supervisa una cola comprobando en intervalos regulares si hay mensajes nuevos. Cuando la cola está vacía, el agente de escucha continúa sondeando la cola, retrocediendo periódicamente mientras entra en un estado en suspensión.

  • Activación (modelo basado en la inserción): un agente de escucha se suscribe a un evento desencadenado (ya sea por el propio publicador o por el administrador del servicio de cola) siempre que un mensaje llega a una cola. El agente de escucha a su vez puede iniciar el procesamiento de los mensajes, por lo que no es necesario sondear la cola para determinar si hay disponible un nuevo trabajo o no.

También conviene mencionar que hay diferentes variaciones de los dos mecanismos. Por ejemplo, el sondeo puede ser con bloqueo y sin bloqueo. Si hay bloqueo, se mantiene una solicitud en espera hasta que un mensaje nuevo aparece en una cola (o se agota un tiempo de espera) mientras que una solicitud sin bloqueo se completa inmediatamente si no hay nada en una cola. Con un modelo de activación, se puede insertar una notificación en los agentes de escucha de la cola con cada nuevo mensaje, solo cuando el primer mensaje llega a una cola vacía o cuando la profundidad de la cola alcanza un cierto nivel.

noteNota
Las operaciones de eliminación de cola que admite la API del servicio Cola de Windows Azure son sin bloqueo. Esto significa que los métodos de la API como GetMessage o GetMessages volverán inmediatamente si no se encuentra ningún mensaje en una cola. Por el contrario, las colas de Service Bus de Windows Azure ofrecen operaciones de recepción con bloqueo que bloquean al subproceso de llamada hasta que un mensaje llega a una cola o hasta que ha transcurrido un tiempo de espera especificado.

El enfoque más común para implementar agentes de escucha de cola en las soluciones de Windows Azure actualmente puede resumirse de la forma siguiente:

  1. Un agente de escucha se implementa como un componente de aplicación del que se crea una instancia y se ejecuta como parte de una instancia de rol de trabajo.

  2. El ciclo de vida del componente agente de escucha de la cola con frecuencia se enlazaría al runtime de la instancia del rol que lo hospeda.

  3. La lógica de proceso principal consta de un bucle en el que los mensajes se quitan de la cola y se envían para ser procesados.

  4. Si no se recibe ningún mensaje, el subproceso que escucha entra en un estado de suspensión cuya duración suele determinarse en un algoritmo de retroceso específico de la aplicación.

  5. El bucle de recepción se ejecuta y una cola se sondea hasta que se notifica al agente de escucha para que salga del bucle y termine.

El siguiente diagrama de flujo describe la lógica más usada al implementar un agente de escucha de la cola con un mecanismo de sondeo en las aplicaciones de Windows Azure:

Procedimientos-recomendados-Soluciones-de-mensajería-Azure2
noteNota
Dada la finalidad de este artículo, no se utilizan patrones más complejos de diseño, por ejemplo los que requieren el uso de un administrador de cola central (agente).

El uso de un agente de escucha de cola clásico con un mecanismo de sondeo puede no ser la opción óptima cuando se utilizan colas de Windows Azure porque el modelo de precios de Windows Azure mide las transacciones de almacenamiento en función de las solicitudes de aplicación realizadas en la cola, independientemente de si esta está vacía o no. El propósito de las secciones siguientes es explicar algunas técnicas para maximizar el rendimiento y minimizar el costo de las soluciones de mensajería basadas en cola en la plataforma de Windows Azure.

Prácticas recomendadas para la optimización del rendimiento, la escalabilidad y el costo

En esta sección debemos examinar cómo mejorar los aspectos relevantes de diseño para lograr un mayor rendimiento, mejor escalabilidad y mayor rentabilidad.

Quizás, la manera más fácil de calificar un modelo de implementación como “solución más eficiente" sería a través de un diseño que cumpliera los siguientes objetivos:

  • Reducir los gastos operativos quitando una parte significativa de las transacciones de almacenamiento que no aporten ningún trabajo útil.

  • Eliminar la latencia excesiva impuesta por un intervalo de sondeo al comprobar una cola por si hay nuevos mensajes.

  • Ampliar y reducir de modo dinámico adaptando la capacidad de procesamiento a los volúmenes de trabajo volátiles.

El modelo de implementación también debe satisfacer estos objetivos sin introducir un nivel de complejidad que sobrepase en efecto los beneficios asociados.

Prácticas recomendadas para optimizar los costos de las transacciones de almacenamiento

Al evaluar el costo total de propiedad (TCO) y la rentabilidad de la inversión (ROI) de una solución implementada en la plataforma de Windows Azure, el volumen de las transacciones de almacenamiento es una de las variables principales de la ecuación de TCO. Al reducir el número de transacciones en las colas de Windows Azure, se reducen los gastos de explotación puesto que están relacionados con las soluciones actuales de Windows Azure.

En el contexto de una solución de mensajería basada en cola, el volumen de las transacciones de almacenamiento se puede reducir mediante una combinación de los métodos siguientes:

  1. Al colocar los mensajes en una cola, agrupe los mensajes relacionados en un único lote mayor, comprima y almacene la imagen comprimida en un almacén de blobs y use la cola para mantener una referencia al blob que contiene los datos reales.

  2. Al recuperar los mensajes de una cola, agrupe varios mensajes en una única transacción de almacenamiento. El método GetMessages de la API del servicio Cola permite sacar de la cola el número de mensajes especificado en una sola transacción (vea la nota a continuación).

  3. Al comprobar la presencia de elementos de trabajo en una cola, evite intervalos de sondeo demasiado largos e implemente una espera de retroceso que aumente el tiempo transcurrido entre las solicitudes de sondeo si una cola permanece continuamente vacía.

  4. Reduzca el número de agentes de escucha de cola. Al usar un modelo basado en la extracción, use solo un agente de escucha de cola por cada instancia de rol cuando una cola esté vacía. Para reducir más, a cero, el número de agentes de escucha de cola por cada instancia de rol, utilice un mecanismo de notificación para crear una instancia de los agentes de escucha de cola cuando la cola reciba elementos de trabajo.

  5. Si las colas permanecen vacías la mayor parte del tiempo, reduzca automáticamente el número de instancias de rol y continúe supervisando las métricas del sistema pertinentes para determinar si la aplicación debe aumentar proporcionalmente el número de instancias para controlar una carga de trabajo creciente y cuándo.

La mayoría de las recomendaciones anteriores se pueden traducir en una implementación bastante genérica que administre los lotes de mensajes y encapsule muchas de las operaciones subyacentes de almacenamiento en cola y blobs y de administración de subprocesos. Más adelante en este artículo, examinaremos cómo hacer esto.

ImportantImportante
Al recuperar los mensajes a través del método GetMessages, el tamaño de lote máximo admitido por la API del servicio Cola en una sola operación de eliminación de cola se limita a 32.

En general, el costo de las transacciones de cola de Windows Azure aumenta linealmente a medida que el número de clientes del servicio Cola sube, por ejemplo, al incrementar el número de instancias de rol o de subprocesos de eliminación de cola. Para mostrar el posible impacto en el costo del diseño de una solución que no aprovecha las recomendaciones anteriores, suministraremos un ejemplo respaldado por cifras concretas.

Impacto en el costo de un diseño ineficaz

Si el arquitecto de soluciones no implementa las optimizaciones pertinentes, la arquitectura del sistema de facturación descrita antes probablemente supondrá unos gastos de explotación excesivos una vez que la solución esté implementada y en ejecución en la plataforma de Windows Azure. Los motivos de ese posible gasto excesivo se describen en esta sección.

Como se señalaba en la definición del escenario, los datos de las transacciones empresariales llegan a intervalos regulares. Sin embargo, vamos a suponer que la solución está ocupada procesando la carga de trabajo solo el 25 % del tiempo durante un día laboral de ocho horas estándar. Esto da lugar a 6 horas (8 horas * el 75 %) de “tiempo de inactividad” cuando puede no haber ninguna transacción entrando en el sistema. Además, la solución no recibirá ningún dato durante las 16 horas no laborales de cada día.

Durante el período inactivo que asciende a 22 horas, la solución sigue intentando sacar de la cola trabajos porque no conoce explícitamente cuándo llegan nuevos datos. Durante este período de tiempo, cada subproceso individual para sacar de la cola realizará hasta 79.200 transacciones (22 horas * 60 minutos * 60 transacciones/minuto) en una cola de entrada, suponiendo que el intervalo de sondeo predeterminado es de 1 segundo.

Como se mencionó anteriormente, el modelo de precios en la plataforma de Windows Azure se basa en “transacciones individuales de almacenamiento”. Una transacción de almacenamiento es una solicitud realizada por una aplicación de usuario para agregar, leer, actualizar o eliminar datos de almacenamiento. Al redactar estas notas del producto, las transacciones de almacenamiento se facturan con una tarifa de 0,01 $ por cada 10.000 transacciones (independientemente de las ofertas promocionales o los acuerdos de precios especiales).

ImportantImportante
Al calcular el número de transacciones de la cola, tenga presente que colocar un solo mensaje en una cola se contaría como una transacción, mientras que usar un mensaje suele ser un proceso de dos pasos que implica la recuperación seguida de una solicitud para quitar el mensaje de la cola. Por tanto, una operación correcta para quitar de la cola supondrá dos transacciones de almacenamiento. Tenga en cuenta que aunque una solicitud de eliminación de cola no hace que se recuperen datos, sigue contando como una transacción facturable.

Las transacciones de almacenamiento generadas por un solo subproceso de eliminación de cola en el escenario anterior sumarán aproximadamente 2,38 $(79.200 / 10.000 * 0,01 $ * 30 días) a una cuenta mensual. En comparación, 200 subprocesos para eliminar de la cola (o bien un subproceso de eliminación de cola en 200 instancias de rol de trabajo) incrementarán el costo en 457,20 $ mensuales. Ese es el costo en que se incurría cuando la solución no realizaba cálculos, solo comprobaba las colas para ver si había elementos de trabajo disponibles. El ejemplo anterior es abstracto ya que nadie implementaría su servicio de esta manera, lo que constituye la razón por la que es importante realizar la optimización descrita a continuación.

Prácticas recomendadas para eliminar una latencia excesiva

Para optimizar el rendimiento de las soluciones de mensajería de Windows Azure basadas en cola, un método es utilizar el nivel de mensajería de publicación/suscripción que se proporciona con Service Bus de Windows Azure, como se describe en esta sección.

Con este método, los desarrolladores tendrán que centrarse en crear una combinación de notificaciones basadas en inserción en tiempo real y en extracción, lo que permite que los agentes de escucha se suscriban a un evento de notificación (desencadenador) que se activa con determinadas condiciones para indicar que una nueva carga de trabajo se ha puesto en una cola. Este enfoque mejora el bucle tradicional de sondeo de la cola con un nivel de mensajería de publicación/suscripción para enviar notificaciones.

En un sistema distribuido complejo, este enfoque requeriría el uso de un “bus de mensajes” o de un “software intermedio orientado a mensajes” para asegurarse de que las notificaciones se pueden retransmitir de forma confiable a uno o varios suscriptores con acoplamiento flexible. Service Bus de Windows Azure es una opción natural para satisfacer los requisitos de mensajería entre los servicios de aplicaciones distribuidas con acoplamiento flexible que se ejecutan en Windows Azure y en local. También es una solución perfecta para una arquitectura de ”bus de mensajes” que permita intercambiar notificaciones entre los procesos involucrados en la comunicación basada en cola.

Los procesos implicados en un intercambio de mensajes basado en cola podría emplear el modelo siguiente:

Procedimientos-recomendados-Soluciones-de-mensajería-Azure3

En concreto, dado que se trata de interactuar entre los editores del servicio Cola y los suscriptores, los mismos principios que se aplican a la comunicación entre las instancias de rol de Windows Azure cumplirían la mayoría de los requisitos del intercambio de mensajes de notificación basado en cola. Hemos tratado ya estos aspectos básicos en Simplificar y escalar la comunicación entre roles con Service Bus de Windows Azure.

ImportantImportante
El uso de Service Bus de Windows Azure está sujeto a un modelo de precios que tenga en cuenta el volumen de las operaciones de mensajería en una entidad de mensajería de Service Bus como una cola o un tema.

Por lo tanto, es importante realizar un análisis de los costos y los beneficios para evaluar las ventajas y los inconvenientes de introducir Service Bus en una arquitectura determinada. Junto con eso, merece la pena evaluar si la introducción del nivel de envío de notificaciones basado en Service Bus conduciría, de hecho, a una reducción del costo que pueda justificar las inversiones y otros esfuerzos de desarrollo adicionales.

Para obtener más información sobre el modelo de precios de Service Bus, vea las secciones relacionadas de las Preguntas más frecuentes de la plataforma de Windows Azure.

Aunque el impacto en la latencia es bastante fácil de solucionar con un nivel de mensajería de publicación/suscripción, se obtendría una reducción en el costo mayor mediante el uso de una escala dinámica (elástica), como se describe en la sección siguiente.

Prácticas recomendadas para la escala dinámica

La plataforma de Windows Azure permite que los clientes amplíen o disminuyan el sistema de forma más rápida y fácil que antes. La capacidad de adaptarse a cargas de trabajo volátiles y a un tráfico variable es una de las propuestas de valor principales de la plataforma de la nube. Esto significa que la “escalabilidad” ya no es un término del vocabulario de las TI relacionado con algo caro, sino que ahora es una característica lista para usar que se puede habilitar mediante programación a petición en una solución de nube bien diseñada.

La escala dinámica es la capacidad técnica de una solución dada de adaptarse a las cargas de trabajo que fluctúan aumentando y disminuyendo la capacidad de trabajo y la eficacia de procesamiento en tiempo de ejecución. La plataforma de Windows Azure permite por sí misma una escala dinámica a través del aprovisionamiento de una infraestructura informática distribuida en la que las horas de proceso se pueden comprar según las necesidades.

Es importante distinguir entre los dos tipos siguientes de escala dinámica en la plataforma de Windows Azure:

  • La ampliación de instancias de rol hace referencia a la incorporación y a la eliminación de instancias de rol de trabajo o web adicionales para administrar la carga de trabajo puntual. Esto suele conllevar cambiar el recuento de instancias en la configuración del servicio. Al aumentar el contador de instancias, el tiempo de ejecución de Windows Azure inicia nuevas instancias mientras que, al reducir el contador de instancias, se cerrarán las instancias actuales.

  • La escala de proceso (subproceso) hace referencia a mantener la suficiente capacidad en lo relativo a los subprocesos de procesamiento en una instancia de rol determinada ajustando el número de subprocesos arriba o abajo en función de la carga de trabajo actual.

La escala dinámica en una solución de mensajería basada en cola conllevaría una combinación de las recomendaciones generales siguientes:

  1. Supervisar los indicadores clave de rendimiento como son el uso de la CPU, la profundidad de la cola, los tiempos de respuesta y la latencia del procesamiento de mensajes.

  2. Aumentar o disminuir dinámicamente el número de instancias de rol para hacer frente a los picos de la carga de trabajo, ya sean previsibles o no.

  3. Expandir y recortar mediante programación el número de subprocesos de procesamiento para adaptarse a las condiciones de carga variable controladas por un rol de instancia determinado.

  4. Dividir y procesar las cargas de trabajo específicas del proceso simultáneamente mediante la Biblioteca en paralelo de tareas de .NET Framework 4.

  5. Mantener una capacidad viable en las soluciones con cargas de trabajo muy volátiles como anticipación a los picos repentinos a fin de poder controlarlos sin la sobrecarga de configurar instancias adicionales.

La API Administración de servicio permite que un servicio hospedado de Windows Azure modifique el número de sus instancias de rol en ejecución cambiando la configuración de implementación en tiempo de ejecución.

noteNota
El número máximo de instancias de proceso pequeñas de Windows Azure (o el número equivalente de instancias de cálculo de otro tamaño en cuanto al número de núcleos) en una suscripción típica se limita a 20, de forma predeterminada. Cualquier solicitud para aumentar esta cuota se debe pasar al equipo de Soporte técnico de Windows Azure. Para obtener más información, vea las Preguntas más frecuentes de la plataforma de Windows Azure.

La escala dinámica del recuento de instancias de rol puede no ser siempre la opción más adecuada para administrar los picos de carga. Por ejemplo, una nueva instancia de rol puede tardar segundos en activarse y que no exista ninguna métrica del SLA con respecto a la duración de la activación. En su lugar, puede que una solución simplemente tenga que aumentar el número de subprocesos de trabajo para tratar el aumento de carga de trabajo volátil. Mientras se procesa la carga de trabajo, la solución supervisará las métricas de carga pertinentes y determinará si se debe reducir o aumentar dinámicamente el número de procesos de trabajo.

ImportantImportante
Actualmente, el destino de escalabilidad de una sola cola de Windows Azure se “limita” a 500 transacciones/s. Si una aplicación intenta superar este destino, por ejemplo, a través de operaciones de cola desde la instancia de rol múltiple que ejecuta cientos de subprocesos de eliminación de cola, puede dar lugar a la respuesta "Servidor ocupado" HTTP 503 del servicio de almacenamiento. Cuando esto ocurre, la aplicación debe implementar un mecanismo de reintento usando un algoritmo de retraso de retroceso exponencial. Sin embargo, si los errores HTTP 503 aparecen con regularidad, se recomienda utilizar varias colas e implementar una estrategia basada en particiones para realizar la ampliación a través de varias colas.

En la mayoría de los casos, la escala automática de los procesos de trabajo es responsabilidad de una instancia de rol individual. Por el contrario, la escala de instancias de rol suele implicar a un elemento central de la arquitectura de la solución que es responsable de supervisar las métricas de rendimiento y de realizar las acciones adecuadas para la ampliación. El diagrama siguiente describe un componente de servicio denominado agente de escala dinámica que recopila y analiza la métrica de carga para determinar si necesita aprovisionar nuevas instancias o dar de baja las inactivas.

Procedimientos-recomendados-Soluciones-de-mensajería-Azure4

Hay que destacar que el servicio del agente de ampliación se puede implementar como un rol de trabajo que se ejecute en Windows Azure o como un servicio local. Con independencia de la topología de implementación, el servicio podrá obtener acceso a las colas de Windows Azure.

Para implementar una capacidad de escala dinámica, considere el uso del Bloque de aplicaciones de escala automática de la biblioteca de empresas de Microsoft que habilita el comportamiento de la escala automática en las soluciones que se ejecutan en Windows Azure. El Bloque de aplicaciones de escala automática proporciona toda la funcionalidad necesaria para definir y supervisar la escala dinámica en una aplicación de Windows Azure.

Ahora que hemos tratado el impacto de la latencia, los costos de las transacciones de almacenamiento y los requisitos de la escala dinámica, es un buen momento de consolidar nuestras recomendaciones en una implementación técnica.

Implementación técnica

En las secciones anteriores, hemos examinado las características clave atribuidas a una arquitectura bien diseñada de mensajería basada en las colas de almacenamiento de Windows Azure. Hemos examinado tres áreas centrales principales que ayudan a reducir la latencia de procesamiento, a optimizar los costos de las transacciones de almacenamiento y a mejorar la capacidad de respuesta a las cargas de trabajo que fluctúan.

Esta sección tiene como objetivo proporcionar un punto inicial que ayude a los desarrolladores de Windows Azure en la implementación de algunos de los patrones a los que se hace referencia en estas notas del producto desde la perspectiva de la programación.

noteNota
Esta sección se centrará en la creación de un agente de escucha de cola autoescalable que admita tanto los modelos basados en extracción como los basados en inserción. Para conocer técnicas avanzadas de escala dinámica en las instancias de rol, consulte el Bloque de aplicaciones de escala automática de la biblioteca de empresas.

Además, por brevedad, nos centraremos solo en algunos elementos funcionales básicos y evitaremos la complejidad indeseada omitiendo una gran parte del código de la infraestructura en los ejemplos de código siguientes. También es necesario aclarar que la implementación técnica explicada a continuación no es la única solución a un problema concreto. Se ha diseñado para conseguir un punto de inicio a partir del cual los desarrolladores puedan derivar sus propias soluciones más inteligentes.

A partir de aquí, estas notas del producto se centrarán en el código fuente necesario para implementar los modelos descritos anteriormente.

Crear el agente de escucha de cola genérico

Primero, definimos un contrato que se implementará mediante un componente de agente de escucha de cola que un rol de trabajo hospeda y que se escucha en una cola de Windows Azure.

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension
{
    /// Starts a multi-threaded queue listener that uses the specified number of dequeue threads.
    void StartListener(int threadCount);

    /// Returns the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo QueryState();

    /// Gets or sets the batch size when performing dequeue operation against a Windows Azure queue.
    int DequeueBatchSize { get; set; }

    /// Gets or sets the default interval that defines how long a queue listener will be idle for between polling a queue.
    TimeSpan DequeueInterval { get; set; }

    /// Defines a callback delegate which will be invoked whenever the queue is empty.
    event WorkCompletedDelegate QueueEmpty;
}

El evento QueueEmpty está destinado a ser usado por un host. Proporciona un mecanismo para que el host controle el comportamiento del agente de escucha de cola cuando la cola está vacía. El delegado de evento respectivo se define como sigue:

/// <summary>
/// Defines a callback delegate which will be invoked whenever an unit of work has been completed and the worker is
/// requesting further instructions as to next steps.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="idleCount">The value indicating how many times the worker has been idle.</param>
/// <param name="delay">Time interval during which the worker is instructed to sleep before performing next unit of work.</param>
/// <returns>A flag indicating that the worker should stop processing any further units of work and must terminate.</returns>
public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);

Administrar los elementos de la cola es más fácil si un agente de escucha puede operar con productos genéricos en lugar de utilizar las clases de SDK que se ejecutan directamente sobre el hardware como CloudQueueMessage. Por consiguiente, definimos una nueva interfaz que será implementada mediante un agente de escucha de cola capaz de admitir el acceso basado en elementos genéricos a las colas:

/// <summary>
/// Defines a contract that must be supported by an extension that implements a generics-aware queue listener.
/// </summary>
/// <typeparam name="T">The type of queue item data that will be handled by the queue listener.</typeparam>
public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T>
{
}

Tenga en cuenta que también habilitamos el agente de escucha con elementos genéricos para insertar elementos de la cola en uno o varios suscriptores a través de la implementación del Modelo de diseño de observador aprovechando la interfaz IObservable<T> disponible en .NET Framework 4.

Pretendemos mantener una única instancia de un componente que implementa la interfaz ICloudQueueListenerExtension<T>. Sin embargo, necesitamos poder ejecutar varios subprocesos de eliminación de cola (procesos de trabajo o tareas para simplificar). Por consiguiente, agregamos compatibilidad con una lógica de eliminación de cola multiproceso en el componente de agente de escucha de la cola. Aquí es donde aprovechamos la Biblioteca en paralelo de tareas (TPL). El método StartListener se encarga de activar el número especificado de subprocesos de eliminación de cola como se indica a continuación:


/// <summary>
/// Starts the specified number of dequeue tasks.
/// </summary>
/// <param name="threadCount">The number of dequeue tasks.</param>
public void StartListener(int threadCount)
{
    Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");

    // The collection of dequeue tasks needs to be reset on each call to this method.
    if (this.dequeueTasks.IsAddingCompleted)
    {
        this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList);
    }

    for (int i = 0; i < threadCount; i++)
    {
        CancellationToken cancellationToken = this.cancellationSignal.Token;
        CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage);

        // Start a new dequeue task and register it in the collection of tasks internally managed by this component.
        this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
    }

    // Mark this collection as not accepting any more additions.
    this.dequeueTasks.CompleteAdding();
}

El método DequeueTaskMain implementa el cuerpo funcional de un subproceso de eliminación de cola. Sus operaciones principales son las siguientes:

/// <summary>
/// Implements a task performing dequeue operations against a given Windows Azure queue.
/// </summary>
/// <param name="state">An object containing data to be used by the task.</param>
private void DequeueTaskMain(object state)
{
    CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

    int idleStateCount = 0;
    TimeSpan sleepInterval = DequeueInterval;

    try
    {
        // Run a dequeue task until asked to terminate or until a break condition is encountered.
        while (workerState.CanRun)
        {
            try
            {
                var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                int messageCount = 0;

                // Process the dequeued messages concurrently by taking advantage of the above PLINQ query.
                queueMessages.ForAll((message) =>
                {
                    // Reset the count of idle iterations.
                    idleStateCount = 0;

                    // Notify all subscribers that a new message requires processing.
                    workerState.OnNext(message);

                    // Once successful, remove the processed message from the queue.
                    workerState.QueueStorage.Delete<T>(message);

                    // Increment the number of processed messages.
                    messageCount++;
                });

                // Check whether or not we have done any work during this iteration.
                if (0 == messageCount)
                {
                    // Increment the number of iterations when we were not doing any work (e.g. no messages were dequeued).
                    idleStateCount++;

                    // Call the user-defined delegate informing that no more work is available.
                    if (QueueEmpty != null)
                    {
                        // Check if the user-defined delegate has requested a halt to any further work processing.
                        if (QueueEmpty(this, idleStateCount, out sleepInterval))
                        {
                            // Terminate the dequeue loop if user-defined delegate advised us to do so.
                            break;
                        }
                    }

                    // Enter the idle state for the defined interval.
                    Thread.Sleep(sleepInterval);
                }
            }
            catch (Exception ex)
            {
                if (ex is OperationCanceledException)
                {
                    throw;
                }
                else
                {
                    // Offload the responsibility for handling or reporting the error to the external object.
                    workerState.OnError(ex);

                    // Sleep for the specified interval to avoid a flood of errors.
                    Thread.Sleep(sleepInterval);
                }
            }
        }
    }
    finally
    {
        workerState.OnCompleted();
    }
}

Es conveniente destacar un par de cuestiones con respecto a la implementación del método DequeueTaskMain.

En primer lugar, usamos LINQ paralelo (PLINQ) al enviar los mensajes para procesarlos. La ventaja principal de PLINQ aquí es acelerar el control de los mensajes ejecutando el delegado de consultas en subprocesos de trabajo independientes en varios procesadores en paralelo siempre que sea posible.

noteNota
Puesto que la paralelización de consultas se administra internamente con PLINQ, no hay ninguna garantía de que PLINQ vaya a usar más de un núcleo para la paralelización de trabajo. PLINQ puede ejecutar una consulta secuencialmente si determina que la sobrecarga de la paralelización retrasará la consulta. Para beneficiarse de PLINQ, el trabajo completo en la consulta debe ser lo suficientemente grande como para beneficiarse de la sobrecarga de programar el trabajo en el grupo de subprocesos.

En segundo lugar, no vamos a capturar un solo mensaje a la vez. En cambio, pedimos a la API del servicio Cola que recupere un número concreto de mensajes de una cola. Esto se controla mediante el parámetro DequeueBatchSize que se pasa al método Get<T>. Cuando incorporamos la capa de abstracción de almacenamiento implementada como parte de la solución completa, este parámetro se entrega al método de la API del servicio Cola. Además, ejecutamos una comprobación de seguridad para asegurar que el tamaño de lote no supera el tamaño máximo admitido por las API. Se implementa de la forma siguiente:

/// This class provides reliable generics-aware access to the Windows Azure Queue storage.
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage
{
    /// The maximum batch size supported by Queue Service API in a single Get operation.
    private const int MaxDequeueMessageCount = 32;

    /// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
    public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout)
    {
        Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
        Guard.ArgumentNotZeroOrNegativeValue(count, "count");

        try
        {
            var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));

            IEnumerable<CloudQueueMessage> queueMessages = this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() =>
            {
                return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout);
            });

            // ... There is more code after this point ...

Por último, no vamos a ejecutar la tarea de eliminación de cola indefinidamente. Proporcionamos un punto de comprobación explícito implementado como un evento QueueEmpty que se desencadena siempre que una cola se vacía. En ese momento, consultamos un controlador de eventos QueueEmpty para determinar si permite o no que finalicemos la tarea de eliminación de cola en ejecución. Una implementación bien diseñada del controlador de eventos QueueEmpty permite la capacidad de “reducción automática”, como se explica en la sección siguiente.

Reducción automática de tareas de eliminación de cola

El propósito del controlador de eventos QueueEmpty es doble. Primero, es responsable de proporcionar información a la tarea de eliminación de cola de origen indicándole que entre en un estado en suspensión durante un intervalo de tiempo determinado (tal y como se define en el parámetro de salida delay en el delegado de evento). En segundo lugar, indica a la tarea de eliminación de cola si debe cerrarse o no correctamente (como prescribe el parámetro devuelto booleano).

La implementación siguiente del controlador de eventos QueueEmpty resuelve los dos retos indicados antes en estas notas del producto. Calcula un intervalo de retroceso exponencial y aleatorio e indica a la tarea de eliminación de cola que aumente exponencialmente el retraso entre las solicitudes de sondeo de la cola. Tenga en cuenta que el retraso de retroceso no será superior a un segundo como está configurado en nuestra solución, ya que no es realmente necesario tener un valor retraso prolongado entre el sondeo cuando la escala automática se implementa correctamente. Además, consulta el estado del agente de escucha de cola para determinar el número de tareas de eliminación de cola activas. Si este número es superior a 1, el controlador de eventos aconseja a la tarea de eliminación de cola original que complete su bucle de sondeo siempre que el intervalo de retroceso también haya alcanzado el máximo especificado. De lo contrario, la tarea de eliminación de cola no se terminará, dejando exactamente un subproceso de sondeo ejecutándose a la vez por cada instancia del agente de escucha de cola. Este enfoque ayuda a reducir el número de transacciones de almacenamiento y, por lo tanto, disminuye los costos de las transacciones, como se ha explicado antes.

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
    ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

    // Find out which extension is responsible for retrieving the worker role configuration settings.
    IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>();

    // Get the current state of the queue listener to determine point-in-time load characteristics.
    CloudQueueListenerInfo queueServiceState = queueService.QueryState();

    // Set up the initial parameters, read configuration settings.
    int deltaBackoffMs = 100;
    int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds);
    int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds);

    // Calculate a new sleep interval value that will follow a random exponential back-off curve.
    int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
    int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);

    // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
    delay = TimeSpan.FromMilliseconds((double)interval);

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself
    // unless this is a last deqeueue task. If so, we are not going to keep it running and continue polling the queue.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1;
}

En un nivel superior, la capacidad de “reducción de tarea de eliminación de cola” descrita anteriormente puede explicarse como sigue:

  1. Siempre que no haya nada en la cola, las tareas de eliminación de cola se asegurarán de que la carga de trabajo se procesará lo antes posible. No habrá retraso entre las solicitudes de los mensajes de eliminación de cola de una cola.

  2. En cuanto la cola de origen esté vacía, cada tarea de eliminación de cola generará un evento QueueEmpty.

  3. El controlador de eventos QueueEmpty calculará un valor de retroceso de espera exponencial aleatorio e indicará a la tarea de eliminación de cola que suspenda su actividad durante un intervalo determinado.

  4. Las tareas de eliminación de cola continuarán sondeando la cola de origen en intervalos calculados hasta que la duración de la inactividad supere el máximo permitido.

  5. Tras alcanzar el intervalo inactivo máximo y siempre que la cola de origen siga vacía, todas las tareas de eliminación de cola activas comenzarán a cerrarse correctamente. Esto no ocurrirá todo a la vez, ya que las tareas de eliminación de cola retroceden en puntos diferentes del algoritmo de retroceso.

  6. En algún momento, solo habrá una tarea de eliminación de cola activa en espera de un trabajo. Como resultado, no se producirá ninguna transacción de sondeo inactiva en una cola excepto para esa única tarea.

Para preparar el proceso de recopilación de las características de la carga en un instante, es conveniente mencionar los artefactos de código fuente pertinentes. Primero, hay una estructura que contiene la métrica importante que mide el resultado de la carga que se aplica a la solución. Para mantener la simplicidad, incluimos un pequeño subconjunto de métricas que se utilizará en mayor medida en el código de muestra.

/// Implements a structure containing point-in-time load characteristics for a given queue listener.
public struct CloudQueueListenerInfo
{
    /// Returns the approximate number of items in the Windows Azure queue.
    public int CurrentQueueDepth { get; internal set; }

    /// Returns the number of dequeue tasks that are actively performing work or waiting for work.
    public int ActiveDequeueTasks { get; internal set; }

    /// Returns the maximum number of dequeue tasks that were active at a time.
    public int TotalDequeueTasks { get; internal set; }
}

En segundo lugar, hay un método que implementa un agente de escucha de la cola que devuelve sus métricas de carga como se describe en el siguiente ejemplo:

/// Returns the current state of the queue listener to determine point-in-time load characteristics.
public CloudQueueListenerInfo QueryState()
{
    return new CloudQueueListenerInfo()
    {
        CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName),
        ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(),
        TotalDequeueTasks = this.dequeueTasks.Count
    };
}

Escala automática de las tareas de eliminación de cola

En la sección anterior, introdujimos la posibilidad de reducir el número de tareas de eliminación de cola activas a una sola instancia para minimizar el impacto de las transacciones inactivas en los costos operativos del almacenamiento. En esta sección, vamos a examinar un ejemplo diferente en el que implementemos la capacidad de “escala automática” para recuperar la capacidad de procesamiento cuando sea necesario.

Primero, definimos un delegado de evento que ayudará a seguir las transiciones de estado desde una cola vacía a una que no esté vacía para activar las acciones relacionadas:

/// <summary>
/// Defines a callback delegate which will be invoked whenever new work arrived to a queue while the queue listener was idle.
/// </summary>
/// <param name="sender">The source of the event.</param>
public delegate void WorkDetectedDelegate(object sender);

A continuación extendemos la definición original de la interfaz ICloudQueueServiceWorkerRoleExtension para incluir un nuevo evento que se activará cada vez que un agente de escucha de cola detecte nuevos elementos de trabajo, básicamente cuando la profundidad de la cola cambie de cero a cualquier valor positivo:

public interface ICloudQueueServiceWorkerRoleExtension
{
    // ... The other interface members were omitted for brevity. See the previous code snippets for reference ...

    // Defines a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    event WorkDetectedDelegate QueueWorkDetected;
}

Además, determinamos el lugar correcto en el código del agente de escucha de cola donde un evento se activará. Vamos a desencadenar el evento QueueWorkDetected desde dentro del bucle de eliminación de cola implementado en el método DequeueTaskMain que tiene que extenderse como se indica a continuación:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // An instance of the delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    public event WorkDetectedDelegate QueueWorkDetected;

    private void DequeueTaskMain(object state)
    {
        CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

        int idleStateCount = 0;
        TimeSpan sleepInterval = DequeueInterval;

        try
        {
            // Run a dequeue task until asked to terminate or until a break condition is encountered.
            while (workerState.CanRun)
            {
                try
                {
                    var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
                    int messageCount = 0;

                    // Check whether or not work items arrived to a queue while the listener was idle.
                    if (idleStateCount > 0 && queueMessages.Count() > 0)
                    {
                        if (QueueWorkDetected != null)
                        {
                            QueueWorkDetected(this);
                        }
                    }

                    // ... The rest of the code was omitted for brevity. See the previous code snippets for reference ...

En el último paso, suministramos un controlador para el evento QueueWorkDetected. La implementación de este controlador de eventos se proporcionará en un componente que crea una instancia y hospeda el agente de escucha de la cola. En nuestro caso, es un rol de trabajo. El código responsable de la creación de instancias y la implementación del controlador de eventos consta de lo siguiente:

public class WorkItemProcessorWorkerRole : RoleEntryPoint
{
    // Called by Windows Azure to initialize the role instance.
    public override sealed bool OnStart()
    {
        // ... There is some code before this point ...

        // Instantiate a queue listener for the input queue.
        var inputQueueListener = new CloudQueueListenerExtension<XDocument>(inputQueueLocation);

        // Configure the input queue listener.
        inputQueueListener.QueueEmpty += HandleQueueEmptyEvent;
        inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent;
        inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize;
        inputQueueListener.DequeueInterval = configSettingsExtension.Settings.MinimumIdleInterval;

        // ... There is more code after this point ...
    }

    // Implements a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.
    private void HandleQueueWorkDetectedEvent(object sender)
    {
        // The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.
        ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

        // Get the current state of the queue listener to determine point-in-time load characteristics.
        CloudQueueListenerInfo queueServiceState = queueService.QueryState();

        // Determine the number of queue tasks that would be required to handle the workload in a queue given its current depth.
        int dequeueTaskCount = GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth);

        // If the dequeue task count is less than computed above, start as many dequeue tasks as needed.
        if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount)
        {
            // Start the required number of dequeue tasks.
            queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks);
        }
    }       // ... There is more code after this point ...

A la luz del ejemplo anterior, conviene echar un vistazo más profundo al método GetOptimalDequeueTaskCount. Este método es responsable de calcular el número de tareas de eliminación de cola que se considerarán óptimas para administrar la carga de trabajo en una cola. Cuando se invoca, este método debe determinar (mediante los mecanismos apropiados de toma de decisiones) cuánta “fuerza” necesita el agente de escucha de la cola para procesar el volumen de trabajo ya sea en espera de que llegue a una cola determinada o en previsión de tal acción.

Por ejemplo, el desarrollador puede seguir un planteamiento simplista e insertar un conjunto de reglas estáticas directamente en el método GetOptimalDequeueTaskCount. Mediante las conocidas características de rendimiento y escalabilidad de la infraestructura de cola, la latencia promedio de procesamiento, el tamaño de carga y otras entradas relacionadas, el conjunto de reglas pudo tomar una vista optimista y decidir un recuento de tarea de eliminación de cola óptimo.

En el ejemplo siguiente, se usa una técnica excesivamente simplificada de forma intencionada para determinar el número de tareas de eliminación de cola:

/// <summary>
/// Returns the number of queue tasks that would be required to handle the workload in a queue given its current depth.
/// </summary>
/// <param name="currentDepth">The approximate number of items in the queue.</param>
/// <returns>The optimal number of dequeue tasks.</returns>
private int GetOptimalDequeueTaskCount(int currentDepth)
{
    if (currentDepth < 100) return 10;
    if (currentDepth >= 100 && currentDepth < 1000) return 50;
    if (currentDepth >= 1000) return 100;

    // Return the minimum acceptable count.
    return 1;
}

Repetimos que el código de ejemplo anterior no está pensado para constituir un método “todo en uno”. Una solución mejor sería invocar una regla configurable externamente y fácil de administrar que realice los cálculos necesarios.

En este punto, tenemos un prototipo activo de un agente de escucha de cola capaz de aumentar automáticamente y de disminuir como en una carga de trabajo que fluctúa. Quizás, como toque final, sea necesario incluir la posibilidad de adaptarse a la carga variable mientras se está procesando. Esta capacidad se puede agregar aplicando el mismo modelo que se ha seguido al agregar compatibilidad con el evento QueueWorkDetected.

Ahora, centremos la atención en otra optimización importante que ayudará a reducir la latencia de los agentes de escucha de la cola.

Implementar el nivel de publicación/suscripción para una eliminación de cola con latencia cero

En esta sección, vamos a mejorar la implementación anterior de un agente de escucha de la cola con un mecanismo de notificación basado en inserción creado sobre la capacidad de multidifusión unidireccional de Service Bus. El mecanismo de notificación es responsable de activar un evento que indique al agente de escucha de la cola que inicie el trabajo de eliminación de cola. Este enfoque ayuda a impedir el sondeo de la cola para comprobar si hay mensajes nuevos y, por tanto, eliminar la latencia asociada.

Primero, definimos un evento de desencadenador que recibirá nuestro agente de escucha de la cola en caso de que se deposite una nueva carga de trabajo en una cola:

/// Implements a trigger event indicating that a new workload was put in a queue.
[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]
public class CloudQueueWorkDetectedTriggerEvent
{
    /// Returns the name of the storage account on which the queue is located.
    [DataMember]
    public string StorageAccount { get; private set; }

    /// Returns a name of the queue where the payload was put.
    [DataMember]
    public string QueueName { get; private set; }

    /// Returns a size of the queue's payload (e.g. the size of a message or the number of messages in a batch).
    [DataMember]
    public long PayloadSize { get; private set; }

    // ... The constructor was omitted for brevity ...
}

A continuación, permitimos que las implementaciones del agente de escucha de la cola actúen como suscriptores para recibir un evento de desencadenador. El primer paso consiste en definir un agente de escucha de la cola como observador para el evento CloudQueueWorkDetectedTriggerEvent:

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent>
{
    // ... The body is omitted as it was supplied in previous examples ...
}

El segundo paso es implementar el método OnNext definido en la interfaz IObserver<T>. El proveedor llama a este método para notificar al observador un nuevo evento:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>
{
    // ... There is some code before this point ...

    /// <summary>
    /// Gets called by the provider to notify this queue listener about a new trigger event.
    /// </summary>
    /// <param name="e">The trigger event indicating that a new payload was put in a queue.</param>
    public void OnNext(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Make sure the trigger event is for the queue managed by this listener, otherwise ignore.
        if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName)
        {
            if (QueueWorkDetected != null)
            {
                 QueueWorkDetected(this);
            }
        }
    }

    // ... There is more code after this point ...
}

Como puede verse en el ejemplo anterior, invocamos a propósito el mismo delegado de eventos como se usó en los pasos anteriores. El controlador de eventos QueueWorkDetected ya proporciona la lógica de aplicación necesaria para crear una instancia del número óptimo de tareas de eliminación de cola. Por consiguiente, el mismo controlador de eventos se reutiliza al tratar la notificación CloudQueueWorkDetectedTriggerEvent.

Como se señalaba en las secciones anteriores, no tenemos que mantener una tarea de eliminación de la cola que se ejecute continuamente cuando se emplea una notificación basada en inserción. Por lo tanto, podemos reducir a cero el número de tareas de cola por instancia de agente de escucha de cola y usar un mecanismo de notificación para crear una instancia de las tareas de eliminación de cola cuando la cola reciba elementos de trabajo. Para asegurarse de que no estemos ejecutando ninguna tarea de eliminación de cola inactiva, se requiere la siguiente modificación directa del controlador de eventos de QueueEmpty:

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay)
{
    // ... There is some code before this point ...

    // As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself.
    return delay.TotalMilliseconds >= maximumIdleIntervalMs;
}

En resumen, ya no vamos a detectar si queda una única tarea de eliminación de cola activa. El resultado del controlador de eventos QueueEmpty revisado solo tiene en cuenta el hecho de superar el intervalo de inactividad máximo tras el que todas las tareas de eliminación de cola activas se cerrarán.

Para recibir las notificaciones CloudQueueWorkDetectedTriggerEvent, aprovechamos el modelo de publicación/suscripción que se implementa como mensajería con acoplamiento flexible entre las instancias de rol de Windows Azure. En esencia, enlazamos en el mismo nivel de comunicación entre roles y tratamos los eventos entrantes de la forma siguiente:

public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension
{
    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    public void OnNext(InterRoleCommunicationEvent e)
    {
        if (this.owner != null && e.Payload != null)
        {
            // ... There is some code before this point ...

            if (e.Payload is CloudQueueWorkDetectedTriggerEvent)
            {
                HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent);
                return;
            }

            // ... There is more code after this point ...
        }
    }

    private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e)
    {
        Guard.ArgumentNotNull(e, "e");

        // Enumerate through registered queue listeners and relay the trigger event to them.
        foreach (var queueService in this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>())
        {
            // Pass the trigger event to a given queue listener.
            queueService.OnNext(e);
        }
    }
}

Realizar la multidifusión de un evento de desencadenador definido en la clase CloudQueueWorkDetectedTriggerEvent es responsabilidad última de un publicador, a saber, el componente que deposita los elementos de trabajo en una cola. Este evento se activa bien antes de que se ponga en cola el primer elemento de trabajo o bien después de que el último elemento se ponga en una cola. En el ejemplo siguiente, publicamos un evento de desencadenador tras poner los elementos de trabajo en la cola de entrada:

public class ProcessInitiatorWorkerRole : RoleEntryPoint
{
    // The instance of the role extension which provides an interface to the inter-role communication service.
    private volatile IInterRoleCommunicationExtension interRoleCommunicator;

    // ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

    private void HandleWorkload()
    {
        // Step 1: Receive compute-intensive workload.
        // ... (code was omitted for brevity) ...

        // Step 2: Enqueue work items into the input queue.
        // ... (code was omitted for brevity) ...

        // Step 3: Notify the respective queue listeners that they should expect work to arrive.
        // Create a trigger event referencing the queue into which we have just put work items.
        var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue");

        // Package the trigger into an inter-role communication event.
        var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger);

        // Publish inter-role communication event via the Service Bus one-way multicast.
        interRoleCommunicator.Publish(interRoleEvent);
    }
}

Ahora que hemos creado un agente de escucha de cola capaz de admitir el subproceso múltiple, la escala automática y notificaciones basadas en inserción, es hora de consolidar todas las recomendaciones relativas al diseño de soluciones de mensajería basadas en cola en la plataforma de Windows Azure.

Conclusión

Para maximizar la rentabilidad y la eficacia de las soluciones de mensajería basadas en cola que se ejecutan en la plataforma de Windows Azure, los arquitectos de soluciones y los desarrolladores deben tener en cuenta las siguientes recomendaciones.

Como arquitecto de la solución, debe:

  • Proporcionar una arquitectura de mensajería basada en cola que use el servicio de almacenamiento de cola de Windows Azure para la comunicación asincrónica de alta escala entre los niveles y los servicios en las soluciones basadas en la nube o híbridas.

  • Recomendar que la arquitectura de puesta en cola con particiones no aumente más allá de 500 transacciones/s.

  • Conocer los aspectos básicos del modelo de precios de Windows Azure y optimizar la solución para reducir los costos de transacción con una serie de prácticas recomendadas y de modelos de diseño.

  • Tenga en cuenta los requisitos de escala dinámica proporcionando una arquitectura que se pueda adaptar a las cargas de trabajo volátiles y que fluctúan.

  • Emplear los métodos y las técnicas de escala automática correctos para expandir y reducir de forma elástica la capacidad de proceso para optimizar aún más el gasto de la explotación.

  • Evaluar la proporción entre costos y beneficios que supone reducir la latencia a través de la dependencia de Service Bus de Windows Azure para el envío de notificaciones basadas en inserción en tiempo real.

Como desarrollador, debe:

  • Diseñar una solución de mensajería que emplee el procesamiento por lotes al almacenar y recuperar los datos de las colas de Windows Azure.

  • Implementar un servicio eficaz de agente de escucha de la cola que garantice que, como máximo, un subproceso de eliminación de cola sondeará las colas cuando estén vacías.

  • Reducir dinámicamente el número de instancias de rol de trabajo cuando las colas permanezcan vacías durante un período prolongado.

  • Implementar un algoritmo de retroceso exponencial aleatorio específico de la aplicación para reducir el efecto del sondeo de las colas inactivas en los costos de las transacciones de almacenamiento.

  • Adoptar las técnicas correctas que impidan que se superen los destinos de escalabilidad de una sola cola al implementar publicadores y consumidores de colas de varias instancias y varios subprocesos.

  • Emplear una directiva de reintento sólida capaz de controlar diversas condiciones transitorias al publicar y usar los datos de las colas de Windows Azure.

  • Usar la capacidad unidireccional de generación de eventos que Service Bus de Windows Azure proporciona para admitir las notificaciones basadas en inserción a fin de reducir la latencia y mejorar el rendimiento de la solución de mensajería basada en cola.

  • Explorar las nuevas capacidades de .NET Framework 4 como TPL, PLINQ y el modelo Observer para maximizar el grado de paralelismo, mejorar la simultaneidad y simplificar el diseño de los servicios multiproceso.

El código de ejemplo adjunto puede descargarse de la galería de código de MSDN. El código de ejemplo también incluye todos los componentes de infraestructura requeridos, como la capa de abstracción con elementos genéricos para el servicio Cola de Windows Azure, que no se proporcionaron en los fragmentos de código anteriores. Tenga en cuenta que la licencia pública de Microsoft rige todos los archivos de código fuente, según se explica en los avisos legales correspondientes.

Recursos o referencias adicionales

Para obtener más información sobre el tema descrito en estas notas del producto, consulte lo siguiente:


Fecha de compilación:

2013-10-23
¿Te ha resultado útil?
(Caracteres restantes: 1500)
Gracias por sus comentarios

Adiciones de comunidad

AGREGAR
Mostrar:
© 2014 Microsoft. Reservados todos los derechos.