Exportar (0) Imprimir
Expandir todo

Integrar un servicio de flujo de trabajo WCF con temas y colas de Service Bus

Actualizado: marzo de 2014

Autor: Paolo Salvatori

Revisores: Ralph Squillace, Sidney Higa

Este documento proporciona una introducción a la mensajería negociada de Service Bus y una guía para integrar un servicio de flujo de trabajo WCF con las colas y los temas de Service Bus. El documento examinará el modo de crear un servicio de flujo de trabajo WCF que utilice las colas y los temas para intercambiar mensajes con una aplicación cliente. Como complemento de esta guía, recomiendo encarecidamente el excelente artículo Usar la mensajería de Service Bus de Azure de Roman Kiss que proporciona una solución más sofisticada para el mismo problema y un conjunto de actividades reutilizables que puede usar en un servicio de flujo de trabajo WCF para interactuar con las colas y los temas de Service Bus.

Para obtener más información acerca de Service Bus de Azure, vea los recursos siguientes:

Colas

Las colas proporcionan funciones de mensajería que permiten a una clase variada y heterogénea de aplicaciones que se ejecutan en local o en la nube intercambiar mensajes de una manera flexible, segura y confiable a través de los límites de la red y de relaciones de confianza.

ServiceBus-Queues-Topics-WCF-Workflow-Service1

Las colas se hospedan en Azure en una infraestructura de almacenamiento perdurable y replicada. El tamaño máximo de una cola es de 5 GB. El tamaño máximo de los mensajes es de 256 KB pero puede usar sesiones para crear secuencias de mensajes relacionados con un tamaño ilimitado. Se obtiene acceso a las colas a través de las API siguientes:

Las entidades de cola proporcionan las siguientes funciones:

  • Correlación basada en la sesión, lo que significa que puede generar rutas de acceso multiplexadas de solicitud/respuesta. En un escenario de consumidores en competencia en el que varios procesos de trabajo reciben mensajes de la misma suscripción o cola habilitada para sesiones, se garantiza que los mensajes que comparten el mismo SessionId se reciben del mismo consumidor.

  • La posibilidad de especificar el momento en que el mensaje se agregará a la cola.

  • Patrones de predicción de entrega confiables mediante el modo de recepción PeekLock (un mensaje permanece en la cola hasta que se procesa definitivamente).

  • Compatibilidad con las transacciones, para asegurar que las operaciones de mensajería se confirman de forma atómica. Esta característica permite a una aplicación enviar varios mensajes en el contexto de la misma transacción y, por tanto, confirmar o anular la operación completa como una sola unidad de trabajo.

  • Detección de los duplicados de los mensajes entrantes, lo que permite a los clientes enviar el mismo mensaje varias veces sin consecuencias adversas.

  • Utilidad de mensajes fallidos para aquellos mensajes que no se pueden procesar o que expiran antes de ser recibidos.

  • Aplazamiento de los mensajes para procesarse posteriormente. (Esta funcionalidad es especialmente práctica cuando los mensajes no se reciben en la secuencia esperada y es necesario ponerlos en su sitio correctamente mientras el proceso espera un mensaje determinado para poder seguir con el progreso o cuando los mensajes tienen que procesarse en función de un conjunto de propiedades que definen su prioridad durante un pico de tráfico).

Los dos tipos de API .NET más importantes tipos para los mensajes negociados (aquellos mensajes que se utilizan en las colas, los temas y las suscripciones de Service Bus) son la clase BrokeredMessage (que expone propiedades como MessageId, SessionID y CorrelationId que habilitan la detección automática duplicada y las comunicaciones habilitadas para sesión, entre otras cosas) y la clase QueueDescription, que puede utilizar para controlar el comportamiento de la cola que se crea. La clase QueueDescription tiene las siguientes propiedades importantes:

  • La propiedad DefaultMessageTimeToLive especifica el valor predeterminado del período de vida de los mensajes.

  • La propiedad DuplicateDetectionHistoryTimeWindow define la duración del historial de detección duplicado.

  • La propiedad EnableDeadLetteringOnMessageExpiration permite habilitar o deshabilitar las colas de mensajes fallidos al expirar los mensajes.

  • La propiedad LockDuration define la vigencia del bloqueo utilizado por un consumidor cuando se usa el modo de recepción PeekLock.

  • La propiedad MaxSizeInMegabytes define el tamaño máximo de la cola en megabytes.

  • La propiedad RequiresDuplicateDetection habilita o deshabilita la detección de mensajes duplicados.

  • La propiedad RequiresSession habilita o deshabilita las sesiones.

  • La propiedad MessageCount devuelve el número de mensajes en la cola. (Un sistema inteligente puede usarla para decidir si aumentar o reducir el número de consumidores en competencia que reciben y procesan simultáneamente los mensajes de la cola).

noteNota
Dado que los metadatos no se pueden cambiar una vez creada una entidad de mensajería, modificar el comportamiento de la detección de duplicados requiere eliminar y volver a crear la cola. El mismo principio se aplica a cualquier otro metadato.

Desde el punto de vista de la arquitectura, el uso de colas es una parte importante de las aplicaciones distribuidas porque permite homogeneizar el tráfico muy variable en un flujo predecible de trabajo y después distribuir la carga entre un conjunto de procesos de trabajo cuyo tamaño puede variar dinámicamente para admitir el volumen de mensajes entrantes. En un escenario con consumidores en competencia, cuando un publicador escribe un mensaje en una cola, varios consumidores compiten entre sí para recibirlo pero solo uno de ellos recibirá y procesará el mensaje en cuestión. Es decir, una cola puede tener un único consumidor que reciba todos los mensajes o un conjunto de consumidores en competencia que capturen los mensajes según el paradigma "el primero en entrar es el primero en ser atendido". Por este motivo, las colas son una solución excelente de mensajería para distribuir la carga de trabajo entre varios conjuntos de procesos de trabajo en competencia.

En las arquitecturas de Service Bus u orientadas a servicios que constan de varios sistemas heterogéneos, las interacciones entre los sistemas autónomos es asincrónica y con acoplamiento flexible. En este contexto, se suele considerar que los servicios SOAP o REST mantienen los componentes con acoplamiento flexible pero el uso de entidades de mensajería de Service Bus como las colas y los temas (vea la siguiente sección) incrementa la agilidad, la escalabilidad y la flexibilidad de la arquitectura general al tiempo que contribuye a reducir el acoplamiento flexible de los sistemas individuales.

Para obtener más información de las colas, vea los siguientes artículos:

Temas

Los temas amplían las características de mensajería que proporcionan las colas con la adición de las funciones de publicación y suscripción.

ServiceBus-Queues-Topics-WCF-Workflow-Service2

Una entidad de tema consta de un almacén de mensajes secuenciales como una cola, pero admite hasta 2.000 suscripciones simultáneas y perdurables que retransmiten las copias del mensaje a un conjunto de procesos de trabajo (este número puede variar en el futuro). Como se describe en la ilustración siguiente, cada suscripción puede definir una o varias entidades de regla.

ServiceBus-Queues-Topics-WCF-Workflow-Service3

Cada regla especifica una expresión de filtro que se usa para filtrar los mensajes que pasan a través de la suscripción y una acción de filtro que puede modificar las propiedades del mensaje. En concreto, la clase SqlFilter permite definir una condición al estilo de SQL92 en las propiedades del mensaje:

  • OrderTotal > 5000 OR ClientPriority > 2

  • ShipDestinationCountry = ‘USA’ AND ShipDestinationState = ‘WA’

Para obtener más información acerca de este tema, revise la documentación disponible de la propiedad SqlExpression.

Por el contrario, la clase SqlRuleAction se puede utilizar para modificar, agregar o quitar propiedades de un objeto BrokeredMessage mediante una sintaxis similar a la utilizada por la cláusula SET de un comando SQL UPDATE.

  • SET AuditRequired = 1

  • SET Priority = 'High', Severity = 1

noteNota
Cada regla coincidente que define explícitamente una acción genera una copia independiente del mensaje publicado, de modo que cualquier suscripción podría generar varias copias del mismo mensaje, una para cada regla coincidente.

Como las colas, los temas también admiten un escenario de consumidores en competencia. En este contexto, una suscripción puede tener un único consumidor que reciba todos los mensajes o un conjunto de consumidores en competencia que capturen los mensajes según el paradigma "el primero en entrar es el primero en ser atendido". Los temas son una solución excelente de mensajería para difundir los mensajes a muchas aplicaciones de consumidor o distribuir la carga de trabajo entre varios conjuntos de procesos de trabajo en competencia.

Para obtener más información acerca de los temas, vea los siguientes artículos:

La clase BrokeredMessage

La clase BrokeredMessage modela los mensajes intercambiados por las aplicaciones que se comunican mediante colas y temas. La clase proporciona cuatro constructores públicos distintos:

La clase expone un conjunto interesante de métodos que permiten la ejecución de una amplia variedad de acciones en los mensajes:

  • Cuando se utiliza el modo de recepción PeekLock, el método Abandon permite liberar el bloqueo de un mensaje bloqueado con peek, mientras el método Complete confirma la operación de recepción de un mensaje e indica que este debe marcarse como procesado y eliminado o archivado.

  • El método Defer indica que el destinatario desea aplazar el procesamiento de este mensaje. Como se mencionó antes, aplazar mensajes es una forma cómoda de controlar aquellas situaciones en las que los mensajes no se reciben en la secuencia esperada y es necesario apartarlos con seguridad mientras las aplicaciones esperan un mensaje concreto antes de continuar con el procesamiento del flujo de mensajes.

  • Los métodos DeadLetter Y DeadLetter (cadena, cadena) permiten a una aplicación mover explícitamente un mensaje en la cola de mensajes fallidos de una cola o una suscripción. Tenga en cuenta que al crear una entidad de cola que utiliza la API de administración o el portal de administración de Azure, puede configurarla automáticamente para mover los mensajes que han expirado a la cola de mensajes fallidos. De la misma manera, puede configurar una suscripción para mover los mensajes que han expirado y los que no pueden pasar la evaluación del filtro a su cola de mensajes fallidos.

La clase BrokeredMessage expone una amplia variedad de propiedades:

  • La propiedad ContentType permite especificar el tipo de contenido.

  • El MessageId es el identificador del mensaje.

  • La propiedad CorrelationId se puede utilizar para implementar un modelo de intercambio de mensajes de solicitud-respuesta en el que la aplicación cliente usa la propiedad MessageId de un mensaje de solicitud saliente y la propiedad CorrelationId de un mensaje de respuesta entrante para correlacionar los dos mensajes. (Veremos una implementación de esta técnica más adelante en este artículo).

  • La propiedad SessionId permite establecer u obtener el identificador de la sesión para el mensaje. En un escenario de consumidores en competencia en el que varios procesos de trabajo reciben mensajes de la misma suscripción o cola habilitada para sesiones, se garantiza que los mensajes que comparten el mismoSessionId se reciben del mismo consumidor. En este contexto, cuando una aplicación cliente A envía un flujo de mensajes de solicitud a una aplicación servidor B a través de una cola o un tema y espera los mensajes de respuesta correlativos en una suscripción o cola independiente habilitada para la sesión, la aplicación cliente A puede asignar el identificador de la sesión de recepción a la propiedad ReplyToSessionId de los mensajes salientes para indicar a la aplicación B el valor que se va a asignar a la propiedad SessionId de los mensajes de respuesta.

  • La propiedad ReplyTo obtiene o establece la dirección de la cola a la que responder. En un escenario asincrónico de solicitud-respuesta en el que una aplicación cliente A envía un mensaje de solicitud a una aplicación servidor B a través de una cola o un tema de Service Bus y espera un mensaje de respuesta, por convención, la aplicación cliente A puede usar la propiedad ReplyTo del mensaje de solicitud para indicar a la aplicación de servidor B la dirección de la cola o del tema donde enviar la respuesta. (Veremos una aplicación de esta técnica más adelante en este documento). La propiedad Etiqueta obtiene o establece la etiqueta específica de la aplicación por necesidades de la personalización.

  • La propiedad SequenceNumber devuelve el número único asignado a un mensaje por el Bus de servicio. Esta propiedad se puede utilizar para recuperar un mensaje diferido de una cola o una suscripción.

  • La propiedad TimeToLive permite definir o revisar el valor de período de vida del mensaje. Service Bus no exige una duración máxima para los mensajes que esperan ser procesados en una cola o una suscripción. Sin embargo, puede definir un período de vida predeterminado para los mensajes cuando se crea una cola, un tema o una suscripción o puede definir explícitamente un tiempo de espera de caducidad de un nivel de mensaje con la propiedad TimeToLive.

  • DeliveryCount devuelve el número de entregas de mensajes.

  • La colección Properties permite definir propiedades específicas de los mensajes de la aplicación. Esta es, probablemente, la característica más importante de una entidad BrokeredMessage ya que las propiedades definidas por el usuario se pueden usar para lo siguiente:

    • Contiene la carga de un mensaje. En este contexto, el cuerpo del mensaje podría estar vacío.

    • Define las propiedades específicas de la aplicación que puede utilizar un proceso de trabajo para decidir cómo procesar el mensaje actual.

    • Especifique las expresiones de acción y filtro que se pueden utilizar para definir reglas de enriquecimiento de los datos y del enrutamiento en las suscripciones.

Si conoce las propiedades de contexto de un mensaje de BizTalk, resulta útil pensar en las propiedades definidas por el usuario incluidas en la colección Properties de BrokeredMessage como las propiedades de contexto del mensaje de BizTalk. Otro ejemplo de un contenedor de propiedades utilizado para transmitir información de contexto se representa mediante WCF que proporciona un conjunto especial de enlaces habilitados para el contexto, como BasicHttpContextBinding, NetTcpContextBinding o WSHttpContextBinding, lo que permite enviar parámetros adicionales al servicio para el contexto de intercambio mediante encabezado SOAP o HttpCookies. De hecho, la colección de propiedades BrokeredMessage se puede utilizar para almacenar un fragmento de información o incluso la carga entera del mensaje y, cuando se usan temas y suscripciones, se pueden utilizar para enrutar el mensaje al destino adecuado. Por consiguiente, en un escenario donde un sistema de terceros intercambie mensajes con una aplicación de BizTalk a través de Service Bus, es de gran importancia traducir las propiedades específicas de la aplicación que contiene un objeto BrokeredMessage en las propiedades de contexto del mensaje de BizTalk y viceversa. En el artículo y en el código complementario, mostraré cómo lograr este resultado.

noteNota
Como se indica en Cuotas de Service Bus de AppFabric de Azure, el tamaño máximo para cada propiedad es de 32 KB. El tamaño acumulado de todas las propiedades no puede superar los 64 KB. Esto se aplica a todo el encabezado de BrokeredMessage, que tiene tanto propiedades de usuario como propiedades del sistema (por ejemplo SequenceNumber, Label, MessageId, etc.). El espacio ocupado por las propiedades cuenta para el tamaño total de los mensajes y su tamaño máximo de 256 KB. Si una aplicación supera alguno de los límites los enumerados anteriormente, se produce una excepción SerializationException, por lo que debe esperar a controlar esta condición de error.

NetMessagingBinding

La mensajería negociada de Service Bus admite el modelo de programación WCF y, en particular, proporciona un nuevo enlace denominado NetMessagingBinding que pueden usar las aplicaciones habilitadas para WCF para enviar y recibir mensajes a través de las colas, los temas y las suscripciones sobre el Protocolo de mensajería de Service Bus (SBMP). NetMessagingBinding es el nuevo nombre del enlace de las colas y los temas que proporcionan una integración completa con WCF. Desde una perspectiva funcional, NetMessagingBinding es similar a NetMsmqBinding que permite poner en cola mediante Message Queue Server (MSMQ) como transporte y habilita la compatibilidad con aplicaciones de acoplamiento flexible. En los servicios, NetMessagingBinding proporciona un bombeo automático de mensajes que los extrae de una cola o una suscripción y se integra con el mecanismo ReceiveContext de WCF.

El nuevo enlace admite las interfaces estándar IInputChannel, IOutputChannel e IInputSessionChannel. Cuando una aplicación utiliza WCF y NetMessagingBinding para enviar un mensaje a una cola o un tema, el mensaje se ajusta y se codifica en un elemento de sobre. Para establecer las propiedades específicas de BrokeredMessage, debe crear un objeto BrokeredMessageProperty, establecer las propiedades en él y agregarlo a la colección Properties de Message WCF, como se muestra en la siguiente tabla. Cuando se utiliza NetMessagingBinding para escribir un mensaje en la cola o en un tema, la clase interna ServiceBusOutputChannel busca la propiedad BrokeredMessageProperty de la colección Properties del mensaje WCF y copia todas sus propiedades al objeto BrokeredMessage que crea. Después, copia la carga del mensaje WCF al objeto BrokeredMessage y finalmente publica el mensaje resultante en el tema o la cola de destino.

static void Main(string[] args)
{
    try
    {
        // Create the 
        var channelFactory = new ChannelFactory<IOrderService>("orderEndpoint");
        var clientChannel = channelFactory.CreateChannel();
        
        // Create a order object
        var order = new Order()
                        {
                            ItemId = "001",
                            Quantity = 10
                        };

        // Use the OperationContextScope to create a block within which to access the current OperationScope
        using (var scope = new OperationContextScope((IContextChannel)clientChannel))
        {
            // Create a new BrokeredMessageProperty object
            var property = new BrokeredMessageProperty();

            // Use the BrokeredMessageProperty object to set the BrokeredMessage properties
            property.Label = "OrderItem";
            property.MessageId = Guid.NewGuid().ToString();
            property.ReplyTo = "sb://acme.servicebus.windows.net/invoicequeue";

            // Use the BrokeredMessageProperty object to define application-specific properties
            property.Properties.Add("ShipCountry", "Italy");
            property.Properties.Add("ShipCity", "Milan");

            // Add BrokeredMessageProperty to the OutgoingMessageProperties bag provided 
            // by the current Operation Context 
            OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, property);
            clientChannel.SendOrder(order);
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Asimismo, cuando se utiliza un extremo de servicio basado en NetMessagingBinding para recibir los mensajes de una cola o un tema, una aplicación puede recuperar el objeto BrokeredMessageProperty de la colección Properties del mensaje WCF de entrada, como se muestra en la siguiente tabla. Concretamente, tras recibir un mensaje, las clases internas ServiceBusInputChannel y ServiceBusInputSessionChannel (la última se usa para recibir los mensajes de las colas sessionful y las suscripciones) crean un nuevo Message WCF y copian la carga del cuerpo del BrokeredMessage entrante al cuerpo del mensaje WCF recién creado. A continuación, copian las propiedades del BrokeredMessage entrante a una nueva instancia de la clase BrokeredMessageProperty y, finalmente, agregan esta última a la colección Properties del mensaje WCF entrante.

[ServiceBehavior]
public class OrderService : IOrderService
{
    [OperationBehavior]
    public void ReceiveOrder(Order order)
    {
        // Get the BrokeredMessageProperty from the current OperationContext
        var incomingProperties = OperationContext.Current.IncomingMessageProperties;
        var property = incomingProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;

        ...
    }
}

Dado que Service Bus no admite la interfaz IOutputSessionChannel, todas las aplicaciones que envían mensajes a las colas habilitadas para sesión deben usar un contrato de servicio cuya propiedad SessionMode es diferente de SessionMode.Required. Sin embargo, el runtime WCF de Service Bus admite la interfaz IInputSessionChannel para recibir los mensajes de una suscripción o una cola con sesión utilizando WCF y NetMessagingBinding, una aplicación debe implementar un contrato de servicio con conocimiento de la sesión. El fragmento de código siguiente proporciona un ejemplo de un servicio WCF que recibe los mensajes de una cola o de una suscripción con sesión.

// ServiceBus does not support IOutputSessionChannel.
// All senders sending messages to sessionful queue must use a contract which does not enforce SessionMode.Required.
// Sessionful messages are sent by setting the SessionId property of the BrokeredMessageProperty object.
[ServiceContract]
public interface IOrderService
{
    [OperationContract(IsOneWay = true)]
    [ReceiveContextEnabled(ManualControl = true)]
    void ReceiveOrder(Order order);
}

// ServiceBus supports both IInputChannel and IInputSessionChannel. 
// A sessionful service listening to a sessionful queue must have SessionMode.Required in its contract.
[ServiceContract(SessionMode = SessionMode.Required)]
public interface IOrderServiceSessionful : IOrderService
{
}

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession, ConcurrencyMode = ConcurrencyMode.Single)]
public class OrderService : IOrderServiceSessionful
{
    [OperationBehavior]
    public void ReceiveOrder(Order order)
    {
        // Get the BrokeredMessageProperty from the current OperationContext
        var incomingProperties = OperationContext.Current.IncomingMessageProperties;
        var property = incomingProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;

        ...

        //Complete the Message
        ReceiveContext receiveContext;
        if (ReceiveContext.TryGet(incomingProperties, out receiveContext))
        {
            receiveContext.Complete(TimeSpan.FromSeconds(10.0d));
            ...
        }
        else
        {
            throw new InvalidOperationException("...");
        }
    }
}

Observe que la propiedad ManualControl del atributo de operación ReceiveContextEnabled está establecida en true. Esto requiere que el servicio invoque explícitamente al método ReceiveContext.Complete para confirmar la operación de recepción. De hecho, cuando la propiedad ManualControl está establecida en true, el mensaje recibido del canal se entrega a la operación de servicio con un bloqueo del mensaje. Es responsabilidad de la implementación del servicio llamar a Complete(TimeSpan) o a Abandon(TimeSpan) para señalizar la finalización de la recepción del mensaje. Si no se puede llamar a alguno de estos métodos, el bloqueo se mantiene en el mensaje hasta que transcurre el intervalo de tiempo de espera del bloqueo. Una vez que el bloqueo se libera (ya sea llamando a Abandon(TimeSpan) o porque transcurra el tiempo de espera del bloqueo), el mensaje se envía de nuevo desde el canal al servicio. Al llamar a Complete(TimeSpan), se marca el mensaje como que se ha recibido correctamente.

Observe también que la clase OrderService tiene la propiedad ServiceBehavior.InstanceContextMode establecida en InstanceContextMode.PerSession y la propiedad ConcurrencyMode establecida en ConcurrencyMode.Single. De este modo, ServiceHost creará una nueva instancia de servicio cada vez que una nueva sesión esté disponible en la cola o la suscripción a la que se hace referencia y utilizará un solo subproceso para recibir los mensajes de ella en orden secuencial. La duración de la instancia de servicio se controla estableciendo la propiedad SessionIdleTimeoutde NetMessagingBinding.

Arquitectura

La ilustración siguiente muestra la arquitectura de alto nivel de la demostración:

ServiceBus-Queues-Topics-WCF-Workflow-Service4

Flujo del mensaje:

  1. Una aplicación cliente utiliza un proxy WCF y NetMessagingBinding para enviar un mensaje de solicitud a requestqueue o a requesttopic.

  2. El servicio del flujo de trabajo WCF que se ejecuta en una aplicación de consola o IIS recibe el mensaje de solicitud de la suscripción requestqueue o ItalyMilan definida en requesttopic.

    ServiceBus-Queues-Topics-WCF-Workflow-Service5
  3. El servicio del flujo de trabajo WCF, que se muestra en la ilustración anterior, realiza las siguientes acciones:

    • La BrokeredMessagePropertyActivity personalizada (identificada mediante el nombre para mostrar Get BrokeredMessage) lee BrokeredMessageProperty del mensaje entrante y asigna su valor a una variable de flujo de trabajo definida fuera de la actividad Sequential.

    • La actividad Receive recupera el mensaje de la suscripción requestqueue o ItalyMilan de requesttopic.

    • El CalculatorActivity personalizado recibe el mensaje entrante y BrokeredMessageProperty como argumentos de entrada, procesa el mensaje de solicitud y genera un mensaje de respuesta y un BrokeredMessageProperty saliente. Al ejecutarse en una aplicación de consola, la actividad realiza el seguimiento de las propiedades de las BrokeredMessageProperty entrante y saliente en la salida estándar.

    • La actividad If lee la dirección contenida en la propiedad ReplyTo de la BrokeredMessageProperty entrante.

      • Si la cadena contiene la palabra “tema”, la respuesta se envía a responsetopic.

      • Si no, la respuesta se envía a responsequeue.

    • En ambos casos, una instancia de BrokeredMessagePropertyActivity (identificada mediante el nombre para mostrar Set BrokeredMessage) se utiliza para ajustar actividad Send y asignar la BrokeredMessageProperty saliente a la colección de propiedades del mensajes de respuesta WCF.

  4. El servicio del flujo de trabajo WCF escribe el mensaje de respuesta a responsequeue o a responsetopic.

  5. La aplicación cliente utiliza un servicio WCF con dos extremos diferentes para recuperar el mensaje de respuesta de responsequeue o de responsetopic. En un entorno con varias aplicaciones cliente, cada una debe utilizar una cola o una suscripción independiente para recibir los mensajes de respuesta de BizTalk. Hay más información sobre esto más adelante en el artículo.

Solución

Ahora que hemos descrito la arquitectura global de la demostración, podemos analizar detalladamente los componentes principales de la solución.

Colas, temas y suscripciones

La primera operación que hay que realizar para configurar correctamente el entorno es crear las entidades de mensajería que utiliza la demostración. La primera operación que se va a ejecutar es proporcionar un nuevo espacio de nombres de Bus de servicio o modificar un espacio de nombres existente para incluir Bus de servicio. Puede hacerlo desde el Portal de administración de Azure haciendo clic respectivamente en el bóton Nuevo o Modificar.

El paso siguiente consiste en crear las entidades de la cola, el tema y la suscripción que requiera la demostración. Como mencioné en la introducción, tiene muchas opciones para realizar esta tarea. La manera más fácil es usar el Portal de administración de Azure con los botones del comando Administrar entidades.

Puede usar la vista de árbol de navegación mostrada en el punto 2 para seleccionar una entidad existente y mostrar sus propiedades en la barra vertical resaltada en el punto 3. Para quitar la entidad seleccionada, presione el botón Eliminar en la barra de comandos Administrar entidades.

noteNota
Usar el Portal de administración de Azure es una forma práctica y adecuada de controlar las entidades de mensajería en un espacio de nombres determinado de Service Bus. Sin embargo, al menos por el momento, el conjunto de operaciones que un desarrollador o un administrador del sistema pueden realizar con la interfaz de usuario es muy limitado. Por ejemplo, el portal de administración de Azure realmente permite que un usuario cree las colas, los temas y las suscripciones y defina sus propiedades, pero no le permite crear ni mostrar las reglas de una suscripción existente. Ahora, solo puede realizar esta tarea mediante la API Mensajería .NET. Concretamente, para agregar una nueva regla a una suscripción existente, puede utilizar los métodos AddRule(cadena, filtro) o AddRule(RuleDescription) expuestos por la clase SubscriptionClient, mientras que para enumerar las reglas de una suscripción existente, puede utilizar el método GetRules de la clase NamespaceManager. Además, el portal de administración de Azure en realidad no proporciona la capacidad de realizar las operaciones siguientes:

  1. Visualizar correctamente las entidades de una forma jerárquica. De hecho, el portal de administración de Azure muestra las colas, los temas y las suscripciones con una vista de árbol plana. Sin embargo, puede organizar las entidades de mensajería en una estructura jerárquica especificando simplemente su nombre como ruta de acceso absoluta compuesta por varios segmentos de ruta, por ejemplo crm/prod/queues/orderqueue.

  2. Exportar las entidades de mensajería contenidas en un espacio de nombres determinado de Service Bus a un archivo de enlaces XML (de BizTalk Server). En su lugar, la herramienta Explorador de Service Bus de servicio permite seleccionar y exportar

    1. Entidades individuales.

    2. Entidades por tipo (colas o temas).

    3. Entidades contenidas en cierta ruta de acceso (por ejemplo crm/prod/queues).

    4. Todas las entidades de un espacio de nombres especificado.

  3. Importar las colas, los temas y las suscripciones de un archivo XML a un espacio de nombres existente. El Explorador de Service Bus admite la capacidad de exportar entidades a un archivo XML y de reimportarlas en el mismo o en otro espacio de nombres de Service Bus. Esta característica viene bien para realizar la copia de seguridad y la restauración de un espacio de nombres o simplemente para transferir las colas y los temas de una prueba a un espacio de nombres de producción.

El Explorador de Service Bus permite a un usuario crear, eliminar y probar las colas, los temas, las suscripciones y las reglas y representa el complemento perfecto del portal de administración de Azure oficial.

Para su comodidad, creé una aplicación de consola denominada Aprovisionamiento que utiliza la funcionalidad proporcionada por la clase NamespaceManager para crear las colas, los temas y las suscripciones que requiere la solución. Cuando se inicia, las aplicaciones de consola piden las credenciales del espacio de nombres de servicio. Se utilizan para autenticar al servicio Access Control y adquirir un token de acceso que demuestre a la infraestructura de Service Bus que la aplicación está autorizada a proporcionar las nuevas entidades de mensajería. A continuación, la aplicación pide el valor que se va a asignar a las propiedades de las entidades que se van a crear, como EnabledBatchedOperations y EnableDeadLetteringOnMessageExpiration para las colas. La aplicación de aprovisionamiento crea las entidades siguientes en el espacio de nombres especificado de Service Bus:

  1. Una cola denominada requestqueue que la aplicación cliente usa para enviar mensajes de solicitud a BizTalk Server.

  2. Una cola denominada responsequeue que BizTalk Server usa para enviar mensajes de respuesta a la aplicación cliente.

  3. Un tema denominado requesttopic que la aplicación cliente usa para enviar mensajes de solicitud a BizTalk Server.

  4. Un tema denominado responsetopic que BizTalk Server usa para enviar mensajes de respuesta a la aplicación cliente.

  5. Una suscripción denominada ItalyMilan para requesttopic. BizTalk Server utiliza esta última suscripción para recibir los mensajes de requesttopic. La suscripción en cuestión tiene una sola regla definida como sigue:

    1. Filter: Country='Italy' and City='Milan'

    2. Action: Set Area='Western Europe'

  6. Una suscripción denominada ItalyMilan para responsetopic. La última la utiliza la aplicación cliente para recibir los mensajes de respuesta de responsetopic. La suscripción en cuestión tiene una sola regla definida como sigue:

    1. Filter: Country='Italy' and City='Milan'

    2. Action: Set Area='Western Europe'

La ilustración siguiente muestra la salida de la aplicación de consola Provisioning.

ServiceBus-Queues-Topics-WCF-Workflow-Service8

La siguiente tabla contiene el código de la aplicación Provisioning:

#region Using Directives
using System;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Provisioning
{
    static class Program
    {
        #region Private Constants
        //***************************
        // Constants
        //***************************

        private const string RequestQueue = "requestqueue";
        private const string ResponseQueue = "responsequeue";
        private const string RequestTopic = "requesttopic";
        private const string ResponseTopic = "responsetopic";
        private const string RequestSubscription = "ItalyMilan";
        private const string ResponseSubscription = "ItalyMilan";
        #endregion
        static void Main()
        {
            var defaultColor = Console.ForegroundColor;

            try
            {
                // Set Window Size
                Console.WindowWidth = 100;
                Console.BufferWidth = 100;
                Console.WindowHeight = 48;
                Console.BufferHeight = 48;

                // Print Header            
                Console.WriteLine("Read Credentials:");
                Console.WriteLine("-----------------");

                // Read Service Bus Namespace
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Namespace: ");
                Console.ForegroundColor = defaultColor;
                var serviceNamespace = Console.ReadLine();

                // Read Service Bus Issuer Name
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Issuer Name: ");
                Console.ForegroundColor = defaultColor;
                var issuerName = Console.ReadLine();

                // Read Service Bus Issuer Secret
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Service Bus Issuer Secret: ");
                Console.ForegroundColor = defaultColor;
                var issuerSecret = Console.ReadLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Queues Properties:");
                Console.WriteLine("------------------------");

                // Read Queue EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                var key = Console.ReadKey().KeyChar;
                var queueEnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue EnableDeadLetteringOnMessageExpiration
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnMessageExpiration ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueEnableDeadLetteringOnMessageExpiration = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue RequiresDuplicateDetection
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresDuplicateDetection ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueRequiresDuplicateDetection = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Queue RequiresSession
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Queues: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresSession ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var queueRequiresSession = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Topic Properties:");
                Console.WriteLine("-----------------------");

                // Read Topic EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Topics: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var topicEnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Topic RequiresDuplicateDetection
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Topics: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresDuplicateDetection ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var topicRequiresDuplicateDetection = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Enter Subscriptions Properties: ");
                Console.WriteLine("-------------------------------");

                // Read Subscription EnabledBatchedOperations
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnabledBatchedOperations ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionnabledBatchedOperations = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription EnableDeadLetteringOnFilterEvaluationExceptions
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnFilterEvaluationExceptions ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionEnableDeadLetteringOnFilterEvaluationExceptions = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription EnableDeadLetteringOnMessageExpiration
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("EnableDeadLetteringOnMessageExpiration ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionEnableDeadLetteringOnMessageExpiration = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Read Subscription RequiresSession
                Console.ForegroundColor = ConsoleColor.Green;
                Console.Write("Subscriptions: ");
                Console.ForegroundColor = defaultColor;
                Console.Write("Set the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("RequiresSession ");
                Console.ForegroundColor = defaultColor;
                Console.Write("to true [y=Yes, n=No]?");
                key = Console.ReadKey().KeyChar;
                var subscriptionRequiresSession = key == 'y' || key == 'Y';
                Console.WriteLine();

                // Get ServiceBusNamespaceClient for management operations
                var managementUri = 
                    ServiceBusEnvironment.CreateServiceUri("https", serviceNamespace, string.Empty);
                var tokenProvider = 
                    TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerSecret);
                var namespaceManager = new NamespaceManager(managementUri, tokenProvider);

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Queues:");
                Console.WriteLine("--------------");

                // Create RequestQueue
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue...");
                if (namespaceManager.QueueExists(RequestQueue))
                {
                    namespaceManager.DeleteQueue(RequestQueue);
                }
                namespaceManager.CreateQueue(new QueueDescription(RequestQueue)
                                {
                                    EnableBatchedOperations = queueEnabledBatchedOperations,
                                    EnableDeadLetteringOnMessageExpiration =
                                        queueEnableDeadLetteringOnMessageExpiration,
                                    RequiresDuplicateDetection = queueRequiresDuplicateDetection,
                                    RequiresSession = queueRequiresSession
                                });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue successfully created.");

                // Create ResponseQueue
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue...");
                if (namespaceManager.QueueExists(ResponseQueue))
                {
                    namespaceManager.DeleteQueue(ResponseQueue);
                }
                namespaceManager.CreateQueue(new QueueDescription(ResponseQueue)
                            {
                                EnableBatchedOperations = queueEnabledBatchedOperations,
                                EnableDeadLetteringOnMessageExpiration = 
                                    queueEnableDeadLetteringOnMessageExpiration,
                                RequiresDuplicateDetection = queueRequiresDuplicateDetection,
                                RequiresSession = queueRequiresSession
                            });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseQueue);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" queue successfully created.");

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Topics:");
                Console.WriteLine("--------------");

                // Create RequestTopic
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                if (namespaceManager.TopicExists(RequestTopic))
                {
                    namespaceManager.DeleteTopic(RequestTopic);
                }
                namespaceManager.CreateTopic(new TopicDescription(RequestTopic)
                                        {
                                            EnableBatchedOperations = topicEnabledBatchedOperations,
                                            RequiresDuplicateDetection = topicRequiresDuplicateDetection
                                        });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Create ResponseTopic
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                if (namespaceManager.TopicExists(ResponseTopic))
                {
                    namespaceManager.DeleteTopic(ResponseTopic);
                }
                namespaceManager.CreateTopic(new TopicDescription(ResponseTopic)
                                    {
                                        EnableBatchedOperations = topicEnabledBatchedOperations,
                                        RequiresDuplicateDetection = topicRequiresDuplicateDetection
                                    });
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Print Header
                Console.WriteLine();
                Console.WriteLine("Create Subscriptions:");
                Console.WriteLine("--------------");

                // Create Request Subscription
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                var ruleDescription = new RuleDescription(new SqlFilter("Country='Italy' and City='Milan'"))
                                          {
                                              Name = "$Default",
                                              Action = new SqlRuleAction("Set Area='Western Europe'")
                                          };
                var subscriptionDescription = new SubscriptionDescription(RequestTopic, RequestSubscription)
                {
                    EnableBatchedOperations = subscriptionnabledBatchedOperations,
                    EnableDeadLetteringOnFilterEvaluationExceptions = 
                            subscriptionEnableDeadLetteringOnFilterEvaluationExceptions,
                    EnableDeadLetteringOnMessageExpiration = 
                            subscriptionEnableDeadLetteringOnMessageExpiration,
                    RequiresSession = subscriptionRequiresSession
                };
                namespaceManager.CreateSubscription(subscriptionDescription, ruleDescription);
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(RequestTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");

                // Create Response Subscription
                Console.Write("Creating ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic...");
                ruleDescription = new RuleDescription(new SqlFilter("Country='Italy' and City='Milan'"))
                                      {
                                          Action = new SqlRuleAction("Set Area='Western Europe'")
                                      };
                subscriptionDescription = new SubscriptionDescription(ResponseTopic, ResponseSubscription)
                {
                    EnableBatchedOperations = subscriptionnabledBatchedOperations,
                    EnableDeadLetteringOnFilterEvaluationExceptions = 
                            subscriptionEnableDeadLetteringOnFilterEvaluationExceptions,
                    EnableDeadLetteringOnMessageExpiration = 
                            subscriptionEnableDeadLetteringOnMessageExpiration,
                    RequiresSession = subscriptionRequiresSession
                };
                namespaceManager.CreateSubscription(subscriptionDescription, ruleDescription);
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseSubscription);
                Console.ForegroundColor = defaultColor;
                Console.Write(" subscription for the ");
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write(ResponseTopic);
                Console.ForegroundColor = defaultColor;
                Console.WriteLine(" topic successfully created.");
                Console.WriteLine();

                // Close the application
                Console.WriteLine("Press any key to continue ...");
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.Write("Exception: ");
                Console.ForegroundColor = defaultColor;
                Console.Write(ex.Message);
            }
        }
    }
}

noteNota
No probé todas las combinaciones posibles de propiedades para las colas y los temas de modo que demostración puede no funcionar según lo esperado con todas las configuraciones.

Contratos de datos

Comencé definiendo los contratos de datos para la solicitud y los mensajes de respuesta. Los contratos de datos proporcionan un mecanismo para asignar los tipos CLR .NET definidos en el código y los esquemas XML (XSD) definidos por la organización W3C (www.w3c.org). Los contratos de datos se publican en los metadatos del servicio, permitiendo a los clientes convertir la representación neutra, ajena a la tecnología, de los tipos de datos en sus representaciones nativas. Para obtener más información sobre los datos y los contratos, puede leer los artículos siguientes:

En mi solución, creé un proyecto denominado DataContracts para definir las clases que representan, respectivamente, los mensajes de solicitud y respuesta. Para mayor comodidad, incluí debajo su código.

CalculatorRequest Class

[Serializable]
[XmlType(TypeName = "CalculatorRequest", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[XmlRoot(ElementName = "CalculatorRequest", 
         Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
         IsNullable = false)]
[DataContract(Name = "CalculatorRequest", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class CalculatorRequest
{
    #region Private Fields
    private OperationList operationList;
    #endregion

    #region Public Constructors
    public CalculatorRequest()
    {
        operationList = new OperationList();
    }

    public CalculatorRequest(OperationList operationList)
    {
        this.operationList = operationList;
    }
    #endregion

    #region Public Properties
    [XmlArrayItem("Operation", Type=typeof(Operation), IsNullable = false)]
    [DataMember(Order = 1)]
    public OperationList Operations
    {
        get
        {
            return operationList;
        }
        set
        {
            operationList = value;
        }
    } 
    #endregion
}

[CollectionDataContract(Name = "OperationList", 
                        Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
                        ItemName = "Operation")]
public class OperationList : List<Operation>
{
}

[Serializable]
[XmlType(TypeName = "Operation", 
         AnonymousType = true, 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[DataContract(Name = "Operation", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class Operation
{
    #region Private Fields
    private string op;
    private double operand1;
    private double operand2;
    #endregion

    #region Public Constructors
    public Operation()
    {
    }

    public Operation(string op,
                        double operand1,
                        double operand2)
    {
        this.op = op;
        this.operand1 = operand1;
        this.operand2 = operand2;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public string Operator
    {
        get
        {
            return op;
        }
        set
        {
            op = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 2)]
    public double Operand1
    {
        get
        {
            return operand1;
        }
        set
        {
            operand1 = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 3)]
    public double Operand2
    {
        get
        {
            return operand2;
        }
        set
        {
            operand2 = value;
        }
    } 
    #endregion
}

CalculatorResponse Class

[Serializable]
[XmlType(TypeName = "CalculatorResponse", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[XmlRoot(ElementName = "CalculatorResponse", 
         Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
         IsNullable = false)]
[DataContract(Name = "CalculatorResponse", 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class CalculatorResponse
{
    #region Private Fields
    private string status;
    private ResultList resultList;
    #endregion

    #region Public Constructors
    public CalculatorResponse()
    {
        status = default(string);
        resultList = new ResultList();
    }

    public CalculatorResponse(string status)
    {
        this.status = status;
        resultList = new ResultList();
    }

    public CalculatorResponse(string status, ResultList resultList)
    {
        this.status = status;
        this.resultList = resultList;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public string Status 
    {
        get 
        {
            return status;
        }
        set 
        {
            status = value;
        }
    }

    [XmlArrayItem("Result", Type=typeof(Result), IsNullable=false)]
    [DataMember(Order = 2)]
    public ResultList Results 
    {
        get 
        {
            return resultList;
        }
        set 
        {
            resultList = value;
        }
    }
    #endregion
}

[CollectionDataContract(Name = "ResultList", 
                        Namespace = http://windowsazure.cat.microsoft.com/samples/servicebus, 
                        ItemName = "Result")]
public class ResultList : List<Result>
{
}

[Serializable]
[XmlType(TypeName = "Result", 
         AnonymousType = true, 
         Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
[DataContract(Name = "Result", 
              Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus")]
public class Result
{
    #region Private Fields
    private double value;
    private string error; 
    #endregion

    #region Public Constructors
    public Result()
    {
        value = default(double);
        error = default(string);
    }

    public Result(double value, string error)
    {
        this.value = value;
        this.error = error;
    }
    #endregion

    #region Public Properties
    [XmlElement]
    [DataMember(Order = 1)]
    public double Value
    {
        get
        {
            return value;
        }
        set
        {
            this.value = value;
        }
    }

    [XmlElement]
    [DataMember(Order = 2)]
    public string Error
    {
        get
        {
            return error;
        }
        set
        {
            error = value;
        }
    } 
    #endregion
}

Contratos de servicio

El paso siguiente era definir el atributo de contratos de servicio usado por la aplicación de cliente para intercambiar mensajes con el Bus de servicio. Para ello, creé un proyecto nuevo en mi solución denominado ServiceContracts y definí dos interfaces de contrato de servicio usadas por la aplicación de cliente respectivamente para enviar y recibir mensajes de las entidades de mensajería del Bus de servicio. De hecho, creé dos versiones diferentes del contrato de servicio usado para recibir los mensajes de respuesta:

  • La interfaz ICalculatorResponse está diseñada para recibir los mensajes de respuesta de una cola o una suscripción sin sesión.

  • La interfaz ICalculatorResponseSessionful hereda del contrato de servicio ICalculatorResponse y se marca con el atributo (ServiceContract(SessionMode = SessionMode.Required)]. Este contrato de servicio está diseñado para recibir los mensajes de respuesta de una cola o una suscripción sin sesión.

Observe que los métodos definidos por todos los contratos son unidireccionales. Este es un requisito obligatorio.

ICalculatorRequest, ICalculatorResponse, ICalculatorResponseSessionful Interfaces

#region Using Directives
using System.ServiceModel;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBus.MessageContracts;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBus.ServiceContracts
{
    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus",
                     ConfigurationName = "ICalculatorRequest", 
                     SessionMode = SessionMode.Allowed)]
    public interface ICalculatorRequest
    {
        [OperationContract(Action = "SendRequest", IsOneWay = true)]
        [ReceiveContextEnabled(ManualControl = true)]
        void SendRequest(CalculatorRequestMessage calculatorRequestMessage);
    }

    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus", 
                     ConfigurationName = "ICalculatorResponse",
                     SessionMode = SessionMode.Allowed)]
    public interface ICalculatorResponse
    {
        [OperationContract(Action = "ReceiveResponse", IsOneWay = true)]
        [ReceiveContextEnabled(ManualControl = true)]
        void ReceiveResponse(CalculatorResponseMessage calculatorResponseMessage);
    }

    [ServiceContract(Namespace = "http://windowsazure.cat.microsoft.com/samples/servicebus", 
                     ConfigurationName = "ICalculatorResponse"
                     SessionMode = SessionMode.Required)]
    public interface ICalculatorResponseSessionful : ICalculatorResponse
    {
    }
}

Ahora estamos listos para examinar el código de la aplicación cliente.

Aplicación cliente

Puesto que la aplicación de Windows Forms intercambia mensajes con el servicio subyacente del flujo de trabajo WCF de forma asincrónica a través de las entidades de mensajería de Service Bus, actúa como cliente y como aplicación de servicio al mismo tiempo. Windows Forms utiliza WCF y NetMessagingBinding para realizar las siguientes acciones:

  1. Enviar mensajes de solicitud a requestqueue.

  2. Enviar mensajes de solicitud a requesttopic.

  3. Recibir mensajes de respuesta de responsequeue.

  4. Recibe los mensajes de respuesta de la suscripción ItalyMilan de responsetopic.

Comencemos por revisar el archivo de configuración de la aplicación cliente que desempeña un papel fundamental en la definición de los extremos del servicio y del cliente WCF que se usan para comunicarse con Service Bus.

Aplicación cliente App.Config


   1:  <?xml version="1.0"?>
   2:  <configuration>
   3:    <system.diagnostics>
   4:      <sources>
   5:        <source name="System.ServiceModel.MessageLogging"  
                     switchValue="Warning, ActivityTracing">
   6:          <listeners>
   7:            <add type="System.Diagnostics.DefaultTraceListener" 
                      name="Default">
   8:              <filter type="" />
   9:            </add>
  10:            <add name="ServiceModelMessageLoggingListener">
  11:              <filter type="" />
  12:            </add>
  13:          </listeners>
  14:        </source>
  15:      </sources>
  16:      <sharedListeners>
  17:        <add initializeData="C:\ServiceBusWFClient.svclog"
  18:             type="System.Diagnostics.XmlWriterTraceListener, System, 
  19:                   Version=2.0.0.0, Culture=neutral,
                        PublicKeyToken=b77a5c561934e089"
  20:             name="ServiceModelMessageLoggingListener"
  21:             traceOutputOptions="Timestamp">
  22:          <filter type="" />
  23:        </add>
  24:      </sharedListeners>
  25:      <trace autoflush="true" indentsize="4">
  26:        <listeners>
  27:          <clear/>
  28:          <add name="LogTraceListener"
  29:               type="Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.Client.LogTraceListener, Client"
  30:               initializeData="" />
  31:        </listeners>
  32:      </trace>
  33:    </system.diagnostics>
  34:    <startup>
  35:     <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
  36:    </startup>
  37:    <system.serviceModel>
  38:      <diagnostics>
  39:        <messageLogging logEntireMessage="true"
  40:                        logMalformedMessages="false"
  41:                        logMessagesAtServiceLevel="true"
  42:                        logMessagesAtTransportLevel="false" />
  43:      </diagnostics>
  44:      <behaviors>
  45:        <endpointBehaviors>
  46:          <behavior name="securityBehavior">
  47:            <transportClientEndpointBehavior>
  48:              <tokenProvider>
  49:                <sharedSecret issuerName="owner"
  50:                              issuerSecret="SHARED-SECRET" />
  51:              </tokenProvider>
  52:            </transportClientEndpointBehavior>
  53:          </behavior>
  54:        </endpointBehaviors>
  55:      </behaviors>
  56:      <bindings>
  57:        <netMessagingBinding>
  58:          <binding name="netMessagingBinding"
  59:                   sendTimeout="00:03:00"
  60:                   receiveTimeout="00:03:00"
  61:                   openTimeout="00:03:00"
  62:                   closeTimeout="00:03:00"
  63:                   sessionIdleTimeout="00:01:00"
  64:                   prefetchCount="-1">
  65:            <transportSettings batchFlushInterval="00:00:01" />
  66:          </binding>
  67:        </netMessagingBinding>
  68:      </bindings>
  69:      <client>
  70:        <!-- Invoke WF Service via Service Bus Queue -->
  71: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requestqueue"
  72:                  behaviorConfiguration="securityBehavior" 
  73:                  binding="netMessagingBinding"
  74:                  bindingConfiguration="netMessagingBinding" 
  75:                  contract="ICalculatorRequest"
  76:                  name="requestQueueClientEndpoint" />
  77:        <!-- Invoke WF Service via Service Bus Topic -->
  78: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requesttopic"
  79:                  behaviorConfiguration="securityBehavior"
  80:                  binding="netMessagingBinding"
  81:                  bindingConfiguration="netMessagingBinding"
  82:                  contract="ICalculatorRequest"
  83:                  name="requestTopicClientEndpoint" />
  84:      </client>
  85:      <services>
  86:        <service name="ResponseHandlerService">
  87: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsequeue"
  88:                    behaviorConfiguration="securityBehavior"
  89:                    binding="netMessagingBinding"
  90:                    bindingConfiguration="netMessagingBinding"
  91:                    name="responseQueueServiceEndpoint"
  92:                    contract="ICalculatorResponse" />
  93: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic"
  94:                    listenUri="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
  95:                    behaviorConfiguration="securityBehavior"
  96:                    binding="netMessagingBinding"
  97:                    bindingConfiguration="netMessagingBinding"
  98:                    name="responseSubscriptionServiceEndpoint"
  99:                    contract="ICalculatorResponse" />
 100:        </service>
 101:      </services>
 102:    </system.serviceModel>
 103:  </configuration>

A continuación puede encontrar una breve descripción de los elementos y las secciones principales del archivo de configuración:

  • Las líneas [3-32] definen un agente de escucha de seguimiento personalizado denominado LogTraceListener que ResponseHandlerService usa para escribir el mensaje de respuesta al control del registro de la aplicación de Windows Forms.

  • Las líneas [34-36] de la sección de inicio especifican qué versiones de Common Language Runtime admite la aplicación.

  • Las líneas [46-53] contienen la definición del securityBehavior utilizado por los extremos de cliente y de servicio para autenticarse con el servicio Access Control. Concretamente, el TransportClientEndpointBehavior se utiliza para definir las credenciales del secreto compartido. Para obtener más información sobre cómo recuperar las credenciales del Portal de administración de Azure, vea el cuadro a continuación.

  • Las líneas [57-67] contienen la configuración del NetMessagingBinding utilizado por los extremos de cliente y de servicio para intercambiar mensajes con Service Bus.

  • Las líneas [71-76] contienen la definición del requestQueueClientEndpoint utilizado por la aplicación para enviar mensajes de solicitud a requestqueue. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre de la cola proporcionan la dirección del extremo cliente.

  • Las líneas [78-83] contienen la definición del requestTopicClientEndpoint utilizado por la aplicación para enviar mensajes de solicitud a requesttopic. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre del tema proporcionan la dirección del extremo cliente.

  • Las líneas [87-92] contienen la definición del responseQueueServiceEndpoint utilizado por la aplicación para recibir mensajes de respuesta de requestqueue. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre de la cola proporcionan la dirección del extremo de servicio.

  • Las líneas [93-99] contienen la definición del responseSubscriptionServiceEndpoint utilizado por la aplicación para recibir mensajes de respuesta de la suscripción ItalyMilan para responsetopic. Cuando se define un extremo de servicio WCF que usa NetMessagingBinding para recibir los mensajes de una suscripción, tiene que seguir de la forma siguiente (para obtener más información sobre esto, vea el cuadro a continuación):

    • Como valor del atributo address, debe especificar la dirección URL del tema al que la suscripción pertenece. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre del tema proporcionan la dirección del tema.

    • Como valor del atributo listenUri, debe especificar la dirección URL de la suscripción. La concatenación de la dirección URL del tema, la cadena Suscripciones y el nombre de la suscripción definen la dirección URL de la suscripción.

    • Asigne el valor Explicit al atributo listenUriMode. El valor predeterminado de listenUriMode es Explicit, por lo que este valor es opcional.

noteNota
Al configurar un extremo de servicio WCF para utilizar los mensajes de una cola o una suscripción con sesión, el contrato de servicio debe admitir sesiones. Por tanto, en nuestro ejemplo, al configurar los extremos responseQueueServiceEndpoint o responseSubscriptionServiceEndpoint para recibir, respectivamente, una cola y una suscripción con sesión, tiene que reemplazar el contrato de servicio ICalculatorResponse con la interfaz de contrato con sesión ICalculatorResponseSessionful. Para obtener más información, vea la sección sobre Contratos de servicio posteriormente en el artículo.

noteNota
El Bus de servicio admite tres tipos diferentes de esquemas de credenciales: SAML, Secreto compartido y Símbolo web simple, pero esta versión del Explorador del Bus de servicio solo admite credenciales de Secreto compartido. Sin embargo, puede ampliar fácilmente mi código para admitir otros esquemas de credenciales. Puede recuperar la clave secreto-emisor del Portal de administración de Azure haciendo clic en Ver después de seleccionar el espacio de nombres en la sección Service Bus.

De este modo, se abre el cuadro de diálogo modal en el que puede recuperar la clave haciendo clic en Copiar al Portapapeles resaltado en rojo.

noteNota
Por convención, el nombre del Emisor predeterminado siempre es propietario.

noteNota
Cuando se define un extremo de servicio WCF que utilice el NetMessagingBinding para recibir los mensajes de una suscripción, si comete la equivocación de asignar la dirección URL de la suscripción al atributo de dirección del extremo de servicio (como se explica en la configuración a continuación), en tiempo de ejecución se producirá una FaultException como la siguiente:

The message with To 'sb://NAMESPACE.servicebus.windows.net/responsetopic' cannot be processed at 
the receiver, due to an AddressFilter mismatch at the EndpointDispatcher. 
Check that the sender and receiver's EndpointAddresses agree."}

Configuración incorrecta


<?xml version="1.0"?>
<configuration>
  ...
  <system.serviceModel>
    ...
    <services>
      <service name="Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Service.ResponseHandlerService">
        <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
                  behaviorConfiguration="securityBehavior"
                  binding="netMessagingBinding"
                  bindingConfiguration="netMessagingBinding"
                  name="responseSubscriptionServiceEndpoint"
                  contract="ICalculatorResponse" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

El error se debe al hecho de que el encabezado A WS-Addressing del mensaje contiene la dirección del tema y no la dirección de la suscripción:


<:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://www.w3.org/2005/08/addressing">
  <s:Header>
    <a:Action s:mustUnderstand="1">ReceiveResponse</a:Action>
    <a:MessageID>urn:uuid:64cb0b06-1622-4920-a035-c27b610cfcaf</a:MessageID>
    <a:To s:mustUnderstand="1">sb://NAMESPACE.servicebus.windows.net/responsetopic</a:To>
  </s:Header>
  <s:Body>... stream ...</s:Body>
</s:Envelope>

Para configurar correctamente el extremo del servicio para recibir los mensajes de una suscripción, tiene que continuar de la forma siguiente:

  • Como valor del atributo address, debe especificar la dirección URL del tema al que la suscripción pertenece. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre del tema proporcionan la dirección del tema.

  • Como valor del atributo listenUri, debe especificar la dirección URL de la suscripción. La concatenación de la dirección URL del tema, la cadena Suscripciones y el nombre de la suscripción definen la dirección URL de la suscripción.

  • Asigne el valor Explicit al atributo listenUriMode. El valor predeterminado de listenUriMode es Explicit, por lo que este valor es opcional.

Vea la página siguiente en MSDN para obtener una descripción de los atributos address, listenUri y listenUriMode.

Configuración correcta


<?xml version="1.0"?>
<configuration>
  ...
  <system.serviceModel>
    ...
    <services>
      <service name="Microsoft.WindowsAzure.CAT.Samples.ServiceBus.Service.ResponseHandlerService">
        <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic"
                  listenUri="sb://NAMESPACE.servicebus.windows.net/responsetopic/Subscriptions/ItalyMilan"
                  behaviorConfiguration="securityBehavior"
                  binding="netMessagingBinding"
                  bindingConfiguration="netMessagingBinding"
                  name="subscriptionEndpoint"
                  contract="ICalculatorResponse" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

Para realizar la misma tarea a través de la API, tiene que establecer correctamente el valor de las direcciones Address, ListenUri y ListenUriMode de la instancia de ServiceEndpoint, como se indica en esta nota.

La siguiente tabla muestra el código que la aplicación cliente usa para iniciar el ResponseHandlerService usado para leer los mensajes de respuesta de responsequeue y la suscripción ItalyMilan de responsetopic. Examinaremos el código del servicio en la sección siguiente.

Método StartServiceHost


private void StartServiceHost()
{
    try
    {
        // Creating the service host object as defined in config
        var serviceHost = new ServiceHost(typeof(ResponseHandlerService));
                
        // Add ErrorServiceBehavior for handling errors encounter by servicehost during execution.
        serviceHost.Description.Behaviors.Add(new ErrorServiceBehavior());


        foreach (var serviceEndpoint in serviceHost.Description.Endpoints)
        {
            if (serviceEndpoint.Name == "responseQueueServiceEndpoint")
            {
                responseQueueUri = serviceEndpoint.Address.Uri.AbsoluteUri;
                WriteToLog(string.Format(ServiceHostListeningOnQueue,
                                        serviceEndpoint.Address.Uri.AbsoluteUri));
            }
            if (serviceEndpoint.Name == "responseSubscriptionServiceEndpoint")
            {
                responseTopicUri = serviceEndpoint.Address.Uri.AbsoluteUri;
                WriteToLog(string.Format(ServiceHostListeningOnSubscription,
                                            serviceEndpoint.ListenUri.AbsoluteUri));
            }
        }

        // Start the service host
        serviceHost.Open();
        WriteToLog(ServiceHostSuccessfullyOpened);
    }
    catch (Exception ex)
    {
        mainForm.HandleException(ex);
    }
}

La siguiente imagen muestra la interfaz de usuario de la aplicación cliente:

ServiceBus-Queues-Topics-WCF-Workflow-Service11

Los botones de opción incluidos en el grupo Método de solicitud permiten elegir si se envía el mensaje de solicitud a requestqueue o a requesttopic, mientras que los botones de opción incluidos en el grupo Método de respuesta permiten seleccionar si recibir la respuesta de responsequeue o la suscripción ItalyMilan de responsetopic. Para comunicar la selección a la aplicación de BizTalk subyacente, la aplicación utiliza un objeto BrokeredMessageProperty para asignar el valor de los campos privados responseQueueUri o responseTopicUri a la propiedad ReplyTo. La tabla siguiente contiene el código del método utilizado por la aplicación cliente para enviar el mensaje al Bus de servicio. Para mayor comodidad, los comentarios se han agregado al código para facilitar su descripción.

Método SendRequestMessageUsingWCF


private void SendRequestMessageUsingWCF(string endpointConfigurationName)
{
    try
    {
        if (string.IsNullOrEmpty(endpointConfigurationName))
        {
            WriteToLog(EndpointConfigurationNameCannotBeNull);
            return;
        }

        // Set the wait cursor
        Cursor = Cursors.WaitCursor;

        // Make sure that the request message contains at least an operation
        if (operationList == null ||
            operationList.Count == 0)
        {
            WriteToLog(OperationListCannotBeNull);
            return;
        }

        // Create warning collection
        var warningCollection = new List<string>();

        // Create request message
        var calculatorRequest = new CalculatorRequest(operationList);
        var calculatorRequestMessage = new CalculatorRequestMessage(calculatorRequest);

        // Create the channel factory for the currennt client endpoint
        // and cache it in the channelFactoryDictionary
        if (!channelFactoryDictionary.ContainsKey(endpointConfigurationName))
        {
            channelFactoryDictionary[endpointConfigurationName] = 
                new ChannelFactory<ICalculatorRequest>(endpointConfigurationName);
        }

        // Create the channel for the currennt client endpoint
        // and cache it in the channelDictionary
        if (!channelDictionary.ContainsKey(endpointConfigurationName))
        {
            channelDictionary[endpointConfigurationName] = 
                channelFactoryDictionary[endpointConfigurationName].CreateChannel();
        }

        // Use the OperationContextScope to create a block within which to access the current OperationScope
        using (new OperationContextScope((IContextChannel)channelDictionary[endpointConfigurationName]))
        {
            // Create a new BrokeredMessageProperty object
            var brokeredMessageProperty = new BrokeredMessageProperty();

            // Read the user defined properties and add them to the  
            // Properties collection of the BrokeredMessageProperty object
            foreach (var e in propertiesBindingSource.Cast<PropertyInfo>())
            {
                try
                {
                    e.Key = e.Key.Trim();
                    if (e.Type != StringType && e.Value == null)
                    {
                        warningCollection.Add(string.Format(CultureInfo.CurrentUICulture, 
                                                            PropertyValueCannotBeNull, e.Key));
                    }
                    else
                    {
                        if (brokeredMessageProperty.Properties.ContainsKey(e.Key))
                        {
                            brokeredMessageProperty.Properties[e.Key] = 
                                ConversionHelper.MapStringTypeToCLRType(e.Type, e.Value);
                        }
                        else
                        {
                            brokeredMessageProperty.Properties.Add(e.Key, 
                                ConversionHelper.MapStringTypeToCLRType(e.Type, e.Value));
                        }
                    }
                }
                catch (Exception ex)
                {
                    warningCollection.Add(string.Format(CultureInfo.CurrentUICulture, 
                        PropertyConversionError, e.Key, ex.Message));
                }
            }

            // if the warning collection contains at least one or more items,
            // write them to the log and return immediately
            StringBuilder builder;
            if (warningCollection.Count > 0)
            {
                builder = new StringBuilder(WarningHeader);
                var warnings = warningCollection.ToArray<string>();
                for (var i = 0; i < warningCollection.Count; i++)
                {
                    builder.AppendFormat(WarningFormat, warnings[i]);
                }
                mainForm.WriteToLog(builder.ToString());
                return;
            }

            // Set the BrokeredMessageProperty properties
            brokeredMessageProperty.Label = txtLabel.Text;
            brokeredMessageProperty.MessageId = Guid.NewGuid().ToString();
            brokeredMessageProperty.SessionId = sessionId;
            brokeredMessageProperty.ReplyToSessionId = sessionId;
            brokeredMessageProperty.ReplyTo = responseQueueRadioButton.Checked
                                                ? responseQueueUri
                                                : responseTopicUri;
            OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, 
                                                                    brokeredMessageProperty);
                    
            // Send the request message to the requestqueue or requesttopic
            var stopwatch = new Stopwatch();
            try
            {
                stopwatch.Start();
                channelDictionary[endpointConfigurationName].SendRequest(calculatorRequestMessage);
            }
            catch (CommunicationException ex)
            {
                if (channelFactoryDictionary[endpointConfigurationName] != null)
                {
                    channelFactoryDictionary[endpointConfigurationName].Abort();
                    channelFactoryDictionary.Remove(endpointConfigurationName);
                    channelDictionary.Remove(endpointConfigurationName);
                }
                HandleException(ex);
            }
            catch (Exception ex)
            {
                if (channelFactoryDictionary[endpointConfigurationName] != null)
                {
                    channelFactoryDictionary[endpointConfigurationName].Abort();
                    channelFactoryDictionary.Remove(endpointConfigurationName);
                    channelDictionary.Remove(endpointConfigurationName);
                }
                HandleException(ex);
            }
            finally
            {
                stopwatch.Stop();
            }
            // Log the request message and its properties
            builder = new StringBuilder();
            builder.AppendLine(string.Format(CultureInfo.CurrentCulture,
                    MessageSuccessfullySent,
                    channelFactoryDictionary[endpointConfigurationName].Endpoint.Address.Uri.AbsoluteUri,
                    brokeredMessageProperty.MessageId,
                    brokeredMessageProperty.SessionId,
                    brokeredMessageProperty.Label,
                    stopwatch.ElapsedMilliseconds));
            builder.AppendLine(PayloadFormat);
            for (var i = 0; i < calculatorRequest.Operations.Count; i++)
            {
                builder.AppendLine(string.Format(RequestFormat,
                                                    i + 1,
                                                    calculatorRequest.Operations[i].Operand1,
                                                    calculatorRequest.Operations[i].Operator,
                                                    calculatorRequest.Operations[i].Operand2));
            }
            builder.AppendLine(SentMessagePropertiesHeader);
            foreach (var p in brokeredMessageProperty.Properties)
            {
                builder.AppendLine(string.Format(MessagePropertyFormat,
                                                    p.Key,
                                                    p.Value));
            }
            var traceMessage = builder.ToString();
            WriteToLog(traceMessage.Substring(0, traceMessage.Length - 1));
        }
    }
    catch (Exception ex)
    {
        // Handle the exception
        HandleException(ex);
    }
    finally
    {
        // Restoire the defaulf cursor
        Cursor = Cursors.Default;
    }
}

Servicio del controlador de respuesta

La siguiente tabla contiene el código del servicio WCF que la aplicación cliente usa para recuperar y registrar los mensajes de respuesta de responsequeue y de la suscripción ItalyMilan de responsetopic. Para obtener este resultado, el servicio expone dos extremos diferentes, cada uno de los cuales usa el NetMessagingBinding y recibe los mensajes de una de las dos colas. De hecho, cada suscripción se puede considerar como una cola virtual que obtiene copias de los mensajes publicados para el tema al que pertenecen. La tabla siguiente muestra el código de la clase ResponseHandlerService. Como puede observar, el servicio recupera la propiedad BrokeredMessageProperty de la colección Properties del mensaje WCF entrante y utiliza este objeto para tener acceso a las propiedades del mensaje de respuesta. Puesto que en el contrato de servicio ICalculatorResponse, el método ReceiveResponse se decora con [ReceiveContextEnabled (ManualControl = true)], el reconocimiento de la recepción debe señalarse explícitamente mediante el método del servicio. Esto requiere que el servicio invoque explícitamente al método ReceiveContext.Complete para confirmar la operación de recepción. De hecho, como se indicó al comienzo del artículo, cuando la propiedad ManualControl está establecida en true, el mensaje recibido del canal se entrega a la operación de servicio con un bloqueo del mensaje. Es responsabilidad de la implementación del servicio llamar a Complete(TimeSpan) o a Abandon(TimeSpan) para señalizar la finalización de la recepción del mensaje. Si no se puede llamar a alguno de estos métodos, el bloqueo se mantiene en el mensaje hasta que transcurre el intervalo de tiempo de espera del bloqueo. Una vez que el bloqueo se libera (ya sea llamando a Abandon(TimeSpan) o porque transcurra el tiempo de espera del bloqueo), el mensaje se envía de nuevo desde el canal al servicio. Al llamar a Complete(TimeSpan), se marca el mensaje como que se ha recibido correctamente.

Clase ResponseHandlerService


      [ServiceBehavir(Namespace = "http://windwsazure.cat.micrsft.cm/samples/servicebus", 
                 CnfiguratinName = "RespnseHandlerService")]
 public class RespnseHandlerService : ICalculatrRespnseSessinful
 {
     #regin Private Cnstants
     //***************************
     // Frmats
     //***************************
     private cnst string MessageSuccessfullyReceived = "Respnse Message Received:\n - EndpintUrl:[{0}]\n - CrrelatinId=[{1}]\n - SessinId=[{2}]\n - Label=[{3}]";
     private cnst string ReceivedMessagePrpertiesHeader = "Prperties:";
     private cnst string PayladFrmat = "Paylad:";
     private cnst string StatusFrmat = " - Status=[{0}]";
     private cnst string ResultFrmat = " - Result[{0}]: Value=[{1}] Errr=[{2}]";
     private cnst string MessagePrpertyFrmat = " - Key=[{0}] Value=[{1}]";
 
     //***************************
     // Cnstants
     //***************************
     private cnst string Empty = "EMPTY";
     #endregin
 
     #regin Public peratins
     [peratinBehavir]
     public vid ReceiveRespnse(CalculatrRespnse calculatrRespnse)
     {
         try
         {
             // Get the message prperties
             var incmingPrperties = peratinCntext.Current.IncmingMessagePrperties;
             if (calculatrRespnse != null)
             {
                 var brkeredMessagePrperty = incmingPrperties[BrkeredMessagePrperty.Name] as BrkeredMessagePrperty;
 
                 // Trace the respnse message
                 var builder = new StringBuilder();
                 if (brkeredMessagePrperty != null)
                     builder.AppendLine(string.Frmat(MessageSuccessfullyReceived, 
                                                         peratinCntext.Current.Channel.LcalAddress.Uri.AbsluteUri,
                                                         brkeredMessagePrperty.CrrelatinId ?? Empty,
                                                         brkeredMessagePrperty.SessinId ?? Empty,
                                                         brkeredMessagePrperty.Label ?? Empty));
                 builder.AppendLine(PayladFrmat);
                 builder.AppendLine(string.Frmat(StatusFrmat,
                                                     calculatrRespnse.Status));
                 if (calculatrRespnse.Results != null && 
                     calculatrRespnse.Results.Cunt > 0)
                 {
                     fr (int i = 0; i < calculatrRespnse.Results.Cunt; i++)
                     {
                         builder.AppendLine(string.Frmat(ResultFrmat, 
                                                             i + 1, 
                                                             calculatrRespnse.Results[i].Value,
                                                             calculatrRespnse.Results[i].Errr));
                     }
                 }
                 builder.AppendLine(ReceivedMessagePrpertiesHeader);
                 if (brkeredMessagePrperty != null)
                 {
                     freach (var prperty in brkeredMessagePrperty.Prperties)
                     {
                         builder.AppendLine(string.Frmat(MessagePrpertyFrmat,
                                                             prperty.Key,
                                                             prperty.Value));
                     }
                 }
                 var traceMessage = builder.TString();
                 Trace.WriteLine(traceMessage.Substring(0, traceMessage.Length - 1));
             }
             //Cmplete the Message
             ReceiveCntext receiveCntext;
             if (ReceiveCntext.TryGet(incmingPrperties, ut receiveCntext))
             {
                 receiveCntext.Cmplete(TimeSpan.FrmSecnds(10.0d));
             }
             else
             {
                 thrw new InvalidperatinExceptin("Receiver is in peek lck mde but receive cntext is nt available!");
             }
         }
         catch (Exceptin ex)
         {
             Trace.WriteLine(ex.Message);
         }
     } 
     #endregin
 }

BrokeredMessagePropertyActivity

Los servicios del flujo de trabajo WCF proporcionan un entorno productivo para crear operaciones o servicios de ejecución prolongada y perdurables. Los servicios de flujo de trabajo se implementan utilizando actividades WF que pueden utilizar WCF para enviar y recibir datos. Explicar detalladamente cómo crear un servicio de flujo de trabajo WCF no es un objetivo del presente artículo. Para obtener más información de los servicios de flujo de trabajo WCF, vea los siguientes artículos:

WF 4.0 incluyó por primera vez actividades de mensajería que permiten a los desarrolladores exponer o consumir servicios WCF de una forma sencilla y flexible. Concretamente, las actividades de mensajería permiten a los flujos de trabajo enviar datos a otros sistemas (Send, SendReply) y recibir datos de otros sistemas (Receive, ReceiveReply) mediante WCF. Sin embargo, estas actividades ocultan una gran parte de los entresijos de WCF. En concreto, las actividades de mensajería no proporcionan acceso al actual OperationContext que se puede utilizar para realizar la operación siguiente:

  • En el lado del envío, el OperationContext se puede utilizar para incluir encabezados de mensaje adicionales de sobre SOAP o para agregar propiedades de mensaje al mensaje saliente.

  • En el lado de la recepción, el OperationContext se puede utilizar para recuperar información de la seguridad y las propiedades del mensaje para el mensaje entrante.

Como vimos al principio del artículo, cuando una aplicación utiliza WCF y el NetMessagingBinding para enviar un mensaje a una cola o un tema, el mensaje se ajusta en un elemento de sobre y se codifica. Para establecer las propiedades específicas de BrokeredMessage, debe crear un objeto BrokeredMessageProperty, establecer las propiedades en él y agregarlo a la colección Properties del Message WCF. Por consiguiente, para recuperar la BrokeredMessageProperty de un mensaje entrante o agregar una BrokeredMessageProperty a la colección de propiedades de un mensaje WCF saliente, hay que extender la funcionalidad que las actividades de mensajería proporcionan directamente. Afortunadamente, WF 4.0 permite ampliar el comportamiento del tiempo de ejecución de las actividades de mensajería con ISendMessageCallback e IReceiveMessageCallback. En concreto:

  • La interfaz IReceiveMessageCallback implementa una devolución de llamada que se ejecutará cuando la actividad de recepción reciba un mensaje de servicio.

  • La interfaz ISendMessageCallback.interface implementa una devolución de llamada a se llama justo antes de que la actividad Send envíe un mensaje por el cable.

En mi demostración, usé estos puntos de extensibilidad para crear una NativeActivity personalizada denominada BrokeredMessagePropertyActivity que permite:

  • Obtener la BrokeredMessageProperty de las propiedades de un mensaje WCF entrante.

  • Establecer la BrokeredMessageProperty de un mensaje WCF saliente.

Roman Kiss, en su artículo, siguió el mismo enfoque y creó una actividad más sofisticada que expone una propiedad para cada propiedad expuesta por la clase BrokeredMessageProperty. Es recomendable leer su artículo; explica básicamente una manera alternativa de implementar la misma técnica que describo en el artículo actual.

Para mayor comodidad, incluí el código de BrokeredMessagePropertyActivity en la tabla siguiente:

Clase BrokeredMessagePropertyActivity


[Designer(typeof(BrokeredMessagePropertyActivityDesigner))]
public class BrokeredMessagePropertyActivity : NativeActivity
{
    #region Public Properties
    [Browsable(false)]
    public Activity Body { get; set; }
    public InOutArgument<BrokeredMessageProperty> BrokeredMessageProperty { get; set; }
    #endregion

    #region NativeActivity Overriden Methods
    protected override void CacheMetadata(NativeActivityMetadata metadata)
    {
        metadata.AddChild(Body);
        base.CacheMetadata(metadata);
    }

    protected override void Execute(NativeActivityContext context)
    {
        // Add the BrokeredMessagePropertyMessageCallback implementation as an Execution property 
        var value = context.GetValue(BrokeredMessageProperty);
        context.Properties.Add(typeof(BrokeredMessagePropertyCallback).Name,
                                new BrokeredMessagePropertyCallback(value));
        context.ScheduleActivity(Body, OnBodyCompleted);
    }

    private void OnBodyCompleted(NativeActivityContext context, ActivityInstance instance)
    {
        // Sets the value of the BrokeredMessageProperty argument
        var callback = context.Properties.Find(typeof(BrokeredMessagePropertyCallback).Name) as BrokeredMessagePropertyCallback;
        if (callback != null)
        {
            context.SetValue(BrokeredMessageProperty, callback.BrokeredMessageProperty);
        }
    } 
    #endregion  
}

Clase BrokeredMessagePropertyCallback


[DataContract]
public class BrokeredMessagePropertyCallback : IReceiveMessageCallback, ISendMessageCallback
{
    #region Public Properties
    [DataMember]
    public BrokeredMessageProperty BrokeredMessageProperty { get; set; } 
    #endregion

    #region Public Constructors
    public BrokeredMessagePropertyCallback(BrokeredMessageProperty property)
    {
        BrokeredMessageProperty = property;
    }
    #endregion

    #region IReceiveMessageCallback Methods
    public void OnReceiveMessage(OperationContext operationContext, ExecutionProperties activityExecutionProperties)
    {
        try
        {
            // Get the BrokeredMessageProperty from an inbound message
            var incomingMessageProperties = operationContext.IncomingMessageProperties;
            BrokeredMessageProperty = incomingMessageProperties[BrokeredMessageProperty.Name] as BrokeredMessageProperty;
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    } 
    #endregion

    #region ISendMessageCallback Methods
    public void OnSendMessage(OperationContext operationContext)
    {
        // Set the BrokeredMessageProperty for an outbound message 
        if (BrokeredMessageProperty != null)
        {
            operationContext.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, BrokeredMessageProperty);
        }
    } 
    #endregion
}

Se puede usar BrokeredMessagePropertyActivity para ajustar las actividades Receive o Send. Así, se proporciona automáticamente acceso a la BrokeredMessageProperty que se pueden usar para leer y escribir las propiedades definidas por el usuario y explícitas de un BrokeredMessage.

Actividad de la calculadora

Para preparar la solicitud entrante y generar una respuesta, creé la actividad de código personalizado denominada CalculatorActivity, cuyo código se muestra en la siguiente tabla.

Clase CalculatorActivity


#region Using Directives
using System;
using System.Activities;
using System.ComponentModel;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.DataContracts;
#endregion

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.WorkflowActivities
{
    /// <summary>
    /// This class can be used to process a calculator request.
    /// </summary>
    [Designer(typeof(CalculatorActivityDesigner))]
    public sealed class CalculatorActivity : CodeActivity
    {
        #region Private Constants
        private const string Empty = "Empty";
        private const string MessageId = "MessageId";
        private const string SessionId = "SessionId";
        private const string CorrelationId = "CorrelationId";
        private const string Label = "Label";
        private const string ReplyTo = "ReplyTo";
        private const string Source = "Source";
        private const string CalculatoService = "CalculatoService";
        private const string Ok = "ok";
        private const string Failed = "Failed";
        private const string OperationsHeader = "Operations:";
        private const string OperationFormat = "{0} {1} {2} = {3}";
        private const string RequestMessagePropertiesHeader = "Request Message Properties:";
        private const string ResponseMessagePropertiesHeader = "Response Message Properties:";
        private const string RequestMessageUserDefinedPropertiesHeader = "Request Message User-Defined Properties:";
        private const string ResponseMessageUserDefinedPropertiesHeader = "Response Message User-Defined Properties:";
        private const string PropertyFormat = "Key=[{0}] Value=[{1}]";
        private const string OperationUnknownErrorMessageFormat = "The operation failed because the operator {0} is unknown.";

        #endregion

        #region Activity Arguments
        [DefaultValue(null)]
        public InArgument<CalculatorRequest> CalculatorRequest { get; set; }
        [DefaultValue(null)]
        public InArgument<BrokeredMessageProperty> InboundBrokeredMessageProperty 
                                                   { get; set; }
        public OutArgument<BrokeredMessageProperty> OutboundBrokeredMessageProperty 
                                                   { get; set; }
        public OutArgument<CalculatorResponse> CalculatorResponse { get; set; }
        #endregion

        #region Private Fields
        private readonly string line = new string('-', 79);
        #endregion

        #region Protected Methods
        /// <summary>
        /// Processes a calculator calculatorRequest.
        /// </summary>
        /// <param name="context">The execution context under which the activity executes.</param>
        protected override void Execute(CodeActivityContext context)
        {
            // Obtain the runtime value of the CalculatorRequest input arguments
            var calculatorRequest = context.GetValue(CalculatorRequest);
            var calculatorResponse = new CalculatorResponse();
            if (calculatorRequest == null)
            {
                context.SetValue(CalculatorResponse, calculatorResponse);
                return;
            }
            // Print the properties of the inbound BrokeredMessageProperty
            var brokeredMessageProperty = context.GetValue(InboundBrokeredMessageProperty);
            if (brokeredMessageProperty != null)
            {
                Console.WriteLine(RequestMessagePropertiesHeader);
                Console.WriteLine(line);
                Console.WriteLine(string.Format(PropertyFormat, 
                                                MessageId, 
                                      brokeredMessageProperty.MessageId ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                SessionId, 
                                      brokeredMessageProperty.SessionId ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                ReplyTo, 
                                      brokeredMessageProperty.ReplyTo ?? Empty));
                Console.WriteLine(string.Format(PropertyFormat, 
                                                Label, 
                                      brokeredMessageProperty.Label ?? Empty));
                Console.WriteLine(line);
                if (brokeredMessageProperty.Properties.Count > 0)
                {
                    Console.WriteLine(RequestMessageUserDefinedPropertiesHeader);
                    Console.WriteLine(line);
                    foreach (var property in brokeredMessageProperty.Properties)
                    {
                        Console.WriteLine(string.Format(PropertyFormat, 
                                                        property.Key, 
                                                        property.Value));
                    }
                    Console.WriteLine(line);
                }
            }

            // Process the request message and create a response message
            string error = null;
            calculatorResponse.Status = Ok;
            if (calculatorRequest.Operations.Count > 0)
            {
                Console.WriteLine(OperationsHeader);
                Console.WriteLine(line);
                foreach (var operation in calculatorRequest.Operations)
                {
                    double value = 0;
                    var succeeded = true;
                    switch (operation.Operator)
                    {
                        case "+":
                            value = operation.Operand1 + operation.Operand2;
                            break;
                        case "-":
                            value = operation.Operand1 - operation.Operand2;
                            break;
                        case "*":
                        case "x":
                            value = operation.Operand1 * operation.Operand2;
                            break;
                        case "/":
                        case "\\":
                        case ":":
                            value = operation.Operand1 / operation.Operand2;
                            break;
                        default:
                            error = string.Format(OperationUnknownErrorMessageFormat,
                                                  operation.Operator);
                            succeeded = false;
                            calculatorResponse.Status = Failed;
                            break;
                    }
                    Console.WriteLine(succeeded
                                          ? string.Format(OperationFormat, operation.Operand1, operation.Operator,
                                                          operation.Operand2, value)
                                          : error);
                    calculatorResponse.Results.Add(new Result(value, error));
                }
                Console.WriteLine(line);
            }
            context.SetValue(CalculatorResponse, calculatorResponse);

            // Create a new BrokeredMessageProperty for the reply message
            var replyBrokeredMessageProperty = new BrokeredMessageProperty
            {
               MessageId = Guid.NewGuid().ToString(),
               CorrelationId = brokeredMessageProperty != null ? 
                               brokeredMessageProperty.MessageId :
                               null,
               SessionId = brokeredMessageProperty != null ?
                           brokeredMessageProperty.ReplyToSessionId :
                           null,
               Label = brokeredMessageProperty != null ?
                       brokeredMessageProperty.Label :
                       null
            };
            if (brokeredMessageProperty != null)
            {
                foreach (var property in brokeredMessageProperty.Properties)
                {
                    replyBrokeredMessageProperty.Properties.Add(property);
                }
            }

            // Print the properties of the outbound BrokeredMessageProperty
            replyBrokeredMessageProperty.Properties.Add(Source, CalculatoService);
            Console.WriteLine(ResponseMessagePropertiesHeader);
            Console.WriteLine(line);
            Console.WriteLine(string.Format(PropertyFormat, 
                                            MessageId, 
                               replyBrokeredMessageProperty.MessageId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            CorrelationId, 
                               replyBrokeredMessageProperty.CorrelationId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            SessionId, 
                               replyBrokeredMessageProperty.SessionId ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            ReplyTo, 
                               replyBrokeredMessageProperty.ReplyTo ?? Empty));
            Console.WriteLine(string.Format(PropertyFormat, 
                                            Label, 
                               replyBrokeredMessageProperty.Label ?? Empty));
            Console.WriteLine(line);
            if (replyBrokeredMessageProperty.Properties.Count > 0)
            {
                Console.WriteLine(ResponseMessageUserDefinedPropertiesHeader);
                Console.WriteLine(line);
                foreach (var property in replyBrokeredMessageProperty.Properties)
                {
                    Console.WriteLine(string.Format(PropertyFormat, 
                                                    property.Key, 
                                                    property.Value));
                }
                Console.WriteLine(line);
            }

            // Set the outbound context property
            context.SetValue(OutboundBrokeredMessageProperty, 
                             replyBrokeredMessageProperty);
        }
        #endregion
    }
}

Esta actividad expone dos argumentos de entrada y dos argumentos de salida, respectivamente:

  • CalculatorRequest: este InArgument de tipo CalculatorRequest permite que el flujo de trabajo pase la solicitud a la actividad.

  • InboundBrokeredMessageProperty: este InArgument de tipo BrokeredMessageProperty permite pasar como parámetro de entrada el BrokeredMessageProperty extraído del mensaje de solicitud WCF.

  • CalculatorResponse: este OutArgument de tipo CalculatorResponse usa la actividad de código para devolver la respuesta como parámetro de salida al flujo de trabajo.

  • OutboundBrokeredMessageProperty: la actividad de código usa este OutArgument para devolver la BrokeredMessageProperty saliente al flujo de trabajo que utilizará una instancia de BrokeredMessagePropertyActivity para asignar el valor de este parámetro de salida a la BrokeredMessageProperty del mensaje de respuesta WCF.

En pocas palabras, la CalculatorActivity recibe el mensaje entrante y BrokeredMessageProperty como argumentos de entrada, procesa el mensaje de solicitud y genera un mensaje de respuesta y un BrokeredMessageProperty saliente. Al ejecutarse en una aplicación de consola, la actividad realiza el seguimiento de las propiedades de las BrokeredMessageProperty entrante y saliente en la salida estándar.

Servicio de flujo de trabajo WCF

En esta sección, centraré mi atención en el modo en que el servicio de flujo de trabajo WCF implementa las comunicaciones con la aplicación cliente mediante las colas y los temas de Service Bus. En mi demostración, el servicio de flujo de trabajo WCF se hospeda en una aplicación de consola, pero puede cambiar fácilmente la solución para ejecutar el servicio de flujo de trabajo WCF en una aplicación hospedad en IIS en local o en un rol de Azure en la nube. La tabla siguiente contiene el código que usa la aplicación de consola para inicializar y abrir un objeto WorkflowServiceHost. Concretamente, el extremo HTTP local especificado en el constructor de objeto se puede utilizar para recuperar el WSDL expuesto por el servicio de flujo de trabajo WCF.

Clase de programa


using System;
using System.Xaml;
using System.ServiceModel.Activities;
using Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.Service;

namespace Microsoft.WindowsAzure.CAT.Samples.ServiceBusAndWF.WorkflowConsoleApplication
{

    class Program
    {
        static void Main(string[] args)
        {
            var settings = new XamlXmlReaderSettings()
                               {
                                   LocalAssembly = typeof(Program).Assembly
                               };
            var reader = new XamlXmlReader(@"..\..\CalculatorService.xamlx", settings); 
            var service = (WorkflowService) XamlServices.Load(reader);
            using (var host = new WorkflowServiceHost(service, new Uri("http://localhost:7571")))
            {
                host.Description.Behaviors.Add(new ErrorServiceBehavior());
                host.Open();
                Console.WriteLine("Press [ENTER] to exit");
                Console.ReadLine();
                host.Close();
            }
        }
    }
}

La tabla siguiente contiene el archivo de configuración de la aplicación de consola que desempeña un rol clave en la definición de los extremos de servicio y de cliente WCF que el servicio de flujo de trabajo WCF utiliza para intercambiar los mensajes de respuesta y solicitud con la aplicación cliente a través de las colas y los temas de Service Bus.

Aplicación de consola App.Config


   1:  <?xml version="1.0"?>
   2:  <configuration>
   3:    <system.diagnostics>
   4:      <sources>
   5:        <source name="System.ServiceModel.MessageLogging" switchValue="Warning, ActivityTracing">
   6:          <listeners>
   7:            <add type="System.Diagnostics.DefaultTraceListener" name="Default">
   8:              <filter type="" />
   9:            </add>
  10:            <add name="ServiceModelMessageLoggingListener">
  11:              <filter type="" />
  12:            </add>
  13:          </listeners>
  14:        </source>
  15:      </sources>
  16:      <sharedListeners>
  17:        <add initializeData="C:\WorkflowConsoleApplication.svclog"
  18:             type="System.Diagnostics.XmlWriterTraceListener, System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
  19:             name="ServiceModelMessageLoggingListener"
  20:             traceOutputOptions="Timestamp">
  21:          <filter type="" />
  22:        </add>
  23:      </sharedListeners>
  24:    </system.diagnostics>
  25:    <startup>
  26:      <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
  27:    </startup>
  28:    <system.serviceModel>
  29:      <diagnostics>
  30:        <messageLogging logEntireMessage="true"
  31:                        logMalformedMessages="true"
  32:                        logMessagesAtServiceLevel="true"
  33:                        logMessagesAtTransportLevel="true" />
  34:        </diagnostics>
  35:      <behaviors>
  36:        <serviceBehaviors>
  37:          <behavior>
  38:            <serviceMetadata httpGetEnabled="true" />
  39:            <serviceDebug includeExceptionDetailInFaults="true" />
  40:            <useRequestHeadersForMetadataAddress />
  41:            <workflowUnhandledException action="AbandonAndSuspend" />
  42:          </behavior>
  43:        </serviceBehaviors>
  44:        <endpointBehaviors>
  45:          <behavior name="securityBehavior">
  46:            <transportClientEndpointBehavior>
  47:              <tokenProvider>
  48:                <sharedSecret issuerName="owner" 
  49:                              issuerSecret="ISSUER_SECRET"/>
  50:              </tokenProvider>
  51:            </transportClientEndpointBehavior>
  52:          </behavior>
  53:        </endpointBehaviors>
  54:      </behaviors>
  55:      <bindings>
  56:        <netMessagingBinding>
  57:          <binding name="netMessagingBinding" 
  58:                   sendTimeout="00:03:00" 
  59:                   receiveTimeout="00:03:00" 
  60:                   openTimeout="00:03:00" 
  61:                   closeTimeout="00:03:00" 
  62:                   sessionIdleTimeout="00:01:00" 
  63:                   prefetchCount="-1">
  64:            <transportSettings batchFlushInterval="00:00:01"/>
  65:          </binding>
  66:        </netMessagingBinding>
  67:      </bindings>
  68:      <client>
  69: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsequeue" 
  70:                  behaviorConfiguration="securityBehavior" 
  71:                  binding="netMessagingBinding" 
  72:                  bindingConfiguration="netMessagingBinding" 
  73:                  contract="ICalculatorResponse" 
  74:                  name="ResponseQueueClientEndpoint"/>
  75: <endpoint address="sb://NAMESPACE.servicebus.windows.net/responsetopic" 
  76:                  behaviorConfiguration="securityBehavior" 
  77:                  binding="netMessagingBinding" 
  78:                  bindingConfiguration="netMessagingBinding" 
  79:                  contract="ICalculatorResponse" 
  80:                  name="ResponseTopicClientEndpoint"/>
  81:      </client>
  82:      <services>
  83:        <service name="CalculatorService">
  84: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requestqueue" 
  85:                    behaviorConfiguration="securityBehavior" 
  86:                    binding="netMessagingBinding" 
  87:                    bindingConfiguration="netMessagingBinding" 
  88:                    name="RequestQueueServiceEndpoint" 
  89:                    contract="ICalculatorRequest"/>
  90: <endpoint address="sb://NAMESPACE.servicebus.windows.net/requesttopic"
  91:           listenUri=
"sb://NAMESPACE.servicebus.windows.net/requesttopic/Subscriptions/ItalyMilan"
  92:                    behaviorConfiguration="securityBehavior"
  93:                    binding="netMessagingBinding"
  94:                    bindingConfiguration="netMessagingBinding"
  95:                    name="RequestTopicServiceEndpoint"
  96:                    contract="ICalculatorRequest" />
  97:        </service>
  98:      </services>
  99:    </system.serviceModel>
 100:    <startup>
 101:     <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
 102:    </startup>
 103:  </configuration>

A continuación puede encontrar una breve descripción de los elementos y las secciones principales del archivo de configuración. Observe que la definición de los extremos de servicio y de cliente es dual en cuanto a la configuración de los extremos que usa la aplicación cliente.

  • Las líneas [3-33] habilitar el seguimiento, configuran los orígenes de seguimiento para emitir seguimientos y establecen el nivel de seguimiento. Concretamente, la sección de diagnóstico se configura para realizar el seguimiento de los mensajes en el nivel de servicio y de transporte para un archivo de registro que se puede examinar usando la herramienta del visor de seguimiento de servicio (SvcTraceViewer.exe).

  • Las líneas [36-43] especifican el comportamiento de servicio predeterminado que usa el servicio de flujo de trabajo WCF.

  • Las líneas [44-53] contienen la definición del securityBehavior utilizado por los extremos de cliente y de servicio para autenticarse con el servicio Access Control. Concretamente, el TransportClientEndpointBehavior se utiliza para definir las credenciales del secreto compartido.

  • Las líneas [56-66] contienen la configuración del NetMessagingBinding utilizado por los extremos de cliente y de servicio para intercambiar mensajes con Service Bus.

  • Las líneas [69-74] contienen la definición del ResponseQueueClientEndpoint utilizado por el servicio de flujo de trabajo WCF para enviar mensajes de respuesta a requestqueue. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre de la cola proporcionan la dirección del extremo cliente.

  • Las líneas [78-83] contienen la definición del ResponseTopicClientEndpoint utilizado por el servicio de flujo de trabajo WCF para enviar mensajes de respuesta a responsetopic. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre del tema proporcionan la dirección del extremo cliente.

  • Las líneas [83-89] contienen la definición del RequestQueueServiceEndpoint utilizado por el servicio de flujo de trabajo WCF para recibir mensajes de solicitud de requestqueue. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre de la cola proporcionan la dirección del extremo de servicio.

  • Las líneas [90-96] contienen la definición del RequestTopicServiceEndpoint utilizado por la aplicación para recibir mensajes de solicitud de la suscripción ItalyMilan para requesttopic. Cuando se define un extremo de servicio WCF que usa el NetMessagingBinding para recibir los mensajes de una suscripción, tiene que continuar de la forma siguiente:

    • Como valor del atributo address, debe especificar la dirección URL del tema al que la suscripción pertenece. La concatenación de la dirección URL del espacio de nombres de servicio y el nombre del tema proporcionan la dirección del tema.

    • Como valor del atributo listenUri, debe especificar la dirección URL de la suscripción. La concatenación de la dirección URL del tema, la cadena Suscripciones y el nombre de la suscripción definen la dirección URL de la suscripción.

    • Asigne el valor Explicit al atributo listenUriMode. El valor predeterminado de listenUriMode es Explicit, por lo que este valor es opcional.

  • Las líneas [100-102] especifican qué versión de Common Language Runtime admite la aplicación.

Ahora examinemos los pasos necesarios para crear un servicio de flujo de trabajo WCF que reciba una solicitud y envíe una respuesta a través de las colas y los temas de Service Bus. Cuando creé por primera vez el servicio de flujo de trabajo WCF, el flujo de trabajo solo contenía una actividad Sequence con una actividad Receive seguida de una actividad SendReply, como se muestra en la ilustración siguiente.

ServiceBus-Queues-Topics-WCF-Workflow-Service12

En primer lugar, hice clic en la superficie del flujo de trabajo y asigné la cadena CalculatorService como un valor a ambas propiedades, ConfigurationName y Name, de WorkflowService, como se muestra en la imagen siguiente. Concretamente, la propiedad ConfigurationName indica que el nombre de configuración del servicio de flujo de trabajo y su valor deben ser iguales que el valor del atributo name del elemento service del archivo de configuración.

ServiceBus-Queues-Topics-WCF-Workflow-Service13

A continuación, seleccioné la actividad Sequential, hice clic en el botón Variables para mostrar el editor correspondiente y creé las variables siguientes:

  • calculatorRequest: esta variable es de tipo CalculatorRequest y, como el nombre sugiere, contiene el cuerpo del mensaje de solicitud. La actividad Receive, que se usa para recibir el mensaje de solicitud de la suscripción requestqueue o ItalyMilan de requesttopic, establece su valor.

  • calculatorResponse: esta variable es de tipo CalculatorResponse y contiene el cuerpo del mensaje de respuesta devuelto a la aplicación cliente. El valor se establece mediante una actividad personalizada que procesa la solicitud y genera una respuesta.

  • inboundBrokeredMessageProperty: esta variable contiene la BrokeredMessageProperty del mensaje de solicitud. El valor se establece mediante una instancia BrokeredMessagePropertyActivity que se ajuste a las lecturas de la actividad y Recibir el BrokeredMessageProperty propiedades de mensaje de solicitud de WCF.

  • outboundBrokeredMessageProperty: esta variable contiene la BrokeredMessageProperty del mensaje de respuesta. El valor se establece con una actividad personalizada que procesa la solicitud y genera una respuesta. Una BrokeredMessagePropertyActivity se utiliza para ajustar la actividad Send y asignar el valor de esta variable a la BrokeredMessageProperty del mensaje de respuesta WCF.

ServiceBus-Queues-Topics-WCF-Workflow-Service14

A continuación, agregué una actividad TryCatch al flujo de trabajo y ajusté la actividad Receive a una instancia de la BrokeredMessagePropertyActivity, como se muestra en la imagen siguiente.

ServiceBus-Queues-Topics-WCF-Workflow-Service15

Después, hice clic en la BrokeredMessagePropertyActivity y asigné la variable inboundBrokeredMessageProperty a su propiedad BrokeredMessageProperty, como se muestra en la imagen siguiente.

ServiceBus-Queues-Topics-WCF-Workflow-Service16

A continuación, seleccioné la actividad Receive y configuré sus propiedades para recibir los mensajes de solicitud de requestqueue y de la suscripción ItalyMilan del requesttopic usando, respectivamente, los extremos de servicio RequestQueueServiceEndpoint y RequestTopicServiceEndpoint definidos en el archivo de configuración.

ServiceBus-Queues-Topics-WCF-Workflow-Service17

Concretamente, utilicé la propiedad ServiceContractName de la actividad de recepción para especificar el espacio de nombres de destino y el nombre de contrato para el extremo de servicio y usé la propiedad Action para especificar el encabezado de acción de la solicitud de mensaje como se especifica en el contrato de servicio ICalculatorRequest.

A continuación, agregué una instancia de CalculatorActivity al servicio de flujo de trabajo WCF bajo BrokeredMessagePropertyActivity y configuré sus propiedades como se muestra en la imagen siguiente.

ServiceBus-Queues-Topics-WCF-Workflow-Service18

Después, agregué una actividad If al flujo de trabajo debajo de CalculatorActivity y configuré su propiedad Condition de la forma siguiente:

Not String.IsNullOrEmpty(inboundBrokeredMessageProperty.ReplyTo) And 
inboundBrokeredMessageProperty.ReplyTo.ToLower().Contains("topic")

Después, creé una instancia de BrokeredMessagePropertyActivity en ambas bifurcaciones de la actividad If y agregué una actividad Send a cada una, como se muestra en la imagen siguiente.

ServiceBus-Queues-Topics-WCF-Workflow-Service19

De esta manera, si la dirección de respuesta especificada en la propiedad ReplyTo de la BrokeredMessageProperty entrante contiene la cadena “topic”, la respuesta se envía a responsetopic y, en caso contrario, la respuesta se envía a responsequeue.

En ambos casos, una instancia de BrokeredMessagePropertyActivity (identificada mediante el nombre para mostrar Set BrokeredMessage) se utiliza para ajustar la actividad Send y asignar la BrokeredMessageProperty saliente a la colección de propiedades del mensajes de respuesta WCF. Para realizar esta operación, asigné el valor de la variable outboundBrokeredMessageProperty a la propiedad BrokeredMessageProperty de ambas instancias de BrokeredMessagePropertyActivity, como se muestra en la imagen a continuación.

ServiceBus-Queues-Topics-WCF-Workflow-Service20

A continuación, seleccioné la actividad Send en la bifurcación Then y configuré su propiedad de la forma siguiente para enviar el mensaje de respuesta a responsetopic mediante el ResponseTopicClientEndpoint definido en el archivo de configuración.

ServiceBus-Queues-Topics-WCF-Workflow-Service21

Concretamente, utilicé la propiedad ServiceContractName de la actividad Send para especificar el nombre de contrato y el espacio de nombres de destino para el extremo cliente, la propiedad Action para especificar el encabezado de acción del mensaje de respuesta que se especifica en el contrato de servicio ICalculatorResponse, EndpointConfigurationName para indicar el nombre del extremo de cliente definido en el archivo de configuración.

Igualmente, configuré la actividad Send en la bifurcación Else, como se muestra en la ilustración siguiente.

ServiceBus-Queues-Topics-WCF-Workflow-Service22

En la siguiente imagen se muestra el flujo de trabajo entero:

ServiceBus-Queues-Topics-WCF-Workflow-Service23

Probar la solución

Suponiendo que ha configurado correctamente la solución, puede seguir de la forma siguiente para probarla.

  • Para enviar un mensaje de solicitud al servicio de flujo de trabajo WCF mediante requestqueue, seleccione el botón de opción Cola en el grupo Métodos de solicitud.

  • Para enviar un mensaje de solicitud al servicio de flujo de trabajo WCF mediante requesttopic, seleccione el botón de opción Tema en el grupo Métodos de solicitud.

  • Para pedir al servicio de flujo de trabajo WCF que envíe el mensaje de respuesta a responsequeue, seleccione el botón de opción Cola en el grupo Métodos de respuesta.

  • Para pedir al servicio de flujo de trabajo WCF que envíe el mensaje de respuesta a responsetopic, seleccione el botón de opción Tema en el grupo Métodos de respuesta.

En la ilustración siguiente se muestra la combinación más interesante:

  • El cliente envía un mensaje de solicitud a requesttopic.

  • El servicio del flujo de trabajo WCF lee la solicitud de la suscripción ItalyMilan para requesttopic y envía la respuesta a responsetopic.

  • La aplicación cliente recibe el mensaje de respuesta de la suscripción ItalyMilan definida en responsetopic.

La siguiente imagen muestra la información registrada por el servicio de flujo de trabajo WCF en la salida estándar de la aplicación de consola de host:

ServiceBus-Queues-Topics-WCF-Workflow-Service24

La siguiente imagen muestra la información registrada por la aplicación cliente durante la llamada:

ServiceBus-Queues-Topics-WCF-Workflow-Service25

Concretamente, tenga en cuenta lo siguiente:

  1. Se envía el mensaje de solicitud a requesttopic.

  2. El mensaje de respuesta se recibe de responsetopic.

  3. El CorrelationId del mensaje de respuesta es igual que el MessageId del mensaje de solicitud.

  4. El servicio de flujo de trabajo WCF copió la propiedad Label del mensaje de solicitud para la propiedad Label del mensaje de respuesta.

  5. El servicio de flujo de trabajo WCF copió todas las propiedades definidas por el usuario del mensaje de solicitud al mensaje de respuesta.

  6. El servicio de flujo de trabajo WCF agregó la propiedad definida por el usuario Source a la respuesta.

  7. La acción de la regla definida en la suscripción ItalyMilan agregó la propiedad Area.

Conclusiones

En este artículo hemos visto cómo integrar un servicio de flujo de trabajo WCF con la mensajería negociada de Service Bus y cómo lograr una interoperabilidad completa entre estas dos tecnologías simplemente usando sus características nativas y una actividad personalizada para administrar BrokeredMessageProperty. Espero conocer sus opiniones y, mientras tanto, puede descargar el código complementario de este artículo desde la Galería de código de MSDN.

Adiciones de comunidad

AGREGAR
Mostrar:
© 2014 Microsoft