Walkthrough: Creating a Custom Message Block

This document describes how to create a custom message block type that orders incoming messages by priority.

Although the built-in message block types provide a wide-range of functionality, you can create your own message block type and customize it to meet the requirements of your application. For a description of the built-in message block types that are provided by the Asynchronous Agents Library, see Asynchronous Message Blocks.

Read the following documents before you start this walkthrough:

Message blocks participate in the act of sending and receiving messages. A message block that sends messages is known as a source block. A message block that receives messages is known as a target block. A message block that both sends and receives messages is known as a propagator block. The Agents Library uses the abstract class Concurrency::ISource to represent source blocks and the abstract class Concurrency::ITarget to represent target blocks. Message block types that act as sources derive from ISource; message block types that act as targets derive from ITarget.

Although you can derive your message block type directly from ISource and ITarget, the Agents Library defines three base classes that perform much of the functionality that is common to all message block types, for example, handling errors and connecting message blocks together in a concurrency-safe manner. The Concurrency::source_block class derives from ISource and sends messages to other blocks. The Concurrency::target_block class derives from ITarget and receives messages from other blocks. The Concurrency::propagator_block class derives from ISource and ITarget and sends messages to other blocks and it receives messages from other blocks. We recommend that you use these three base classes to handle infrastructure details so that you can focus on the behavior of your message block.

The source_block, target_block, and propagator_block classes are templates that are parameterized on a type that manages the connections, or links, between source and target blocks and on a type that manages how messages are processed. The Agents Library defines two types that perform link management, Concurrency::single_link_registry and Concurrency::multi_link_registry. The single_link_registry class enables a message block to be linked to one source or to one target. The multi_link_registry class enables a message block to be linked to multiple sources or multiple targets. The Agents Library defines one class that performs message management, Concurrency::ordered_message_processor. The ordered_message_processor class enables message blocks to process messages in the order in which it receives them.

To better understand how message blocks relate to their sources and targets, consider the following example. This example shows the declaration of the Concurrency::transformer class.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

The transformer class derives from propagator_block, and therefore acts as both a source block and as a target block. It accepts messages of type _Input and sends messages of type _Output. The transformer class specifies single_link_registry as the link manager for any target blocks and multi_link_registry as the link manager for any source blocks. Therefore, a transformer object can have up to one target and an unlimited number of sources.

A class that derives from source_block must implement six methods: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, and resume_propagation. A class that derives from target_block must implement the propagate_message method and can optionally implement the send_message method. Deriving from propagator_block is functionally equivalent to deriving from both source_block and target_block.

The propagate_to_any_targets method is called by the runtime to asynchronously or synchronously process any incoming messages and propagate out any outgoing messages. The accept_message method is called by target blocks to accept messages. Many message block types, such as unbounded_buffer, send messages only to the first target that would receive it. Therefore, it transfers ownership of the message to the target. Other message block types, such as Concurrency::overwrite_buffer, offer messages to each of its target blocks. Therefore, overwrite_buffer creates a copy of the message for each of its targets.

The reserve_message, consume_message, release_message, and resume_propagation methods enable message blocks to participate in message reservation. Target blocks call the reserve_message method when they are offered a message and have to reserve the message for later use. After a target block reserves a message, it can call the consume_message method to consume that message or the release_message method to cancel the reservation. As with the accept_message method, the implementation of consume_message can either transfer ownership of the message or return a copy of the message. After a target block either consumes or releases a reserved message, the runtime calls the resume_propagation method. Typically, this method continues message propagation, starting with the next message in the queue.

The runtime calls the propagate_message method to asynchronously transfer a message from another block to the current one. The send_message method resembles propagate_message, except that it synchronously, instead of asynchronously, sends the message to the target blocks. The default implementation of send_message rejects all incoming messages. The runtime does not call either of these methods if the message does not pass the optional filter function that is associated with the target block. For more information about message filters, see Asynchronous Message Blocks.

[go to top]

The priority_buffer class is a custom message block type that orders incoming messages first by priority, and then by the order in which messages are received. The priority_buffer class resembles the Concurrency::unbounded_buffer class because it holds a queue of messages, and also because it acts as both a source and a target message block and can have both multiple sources and multiple targets. However, unbounded_buffer bases message propagation only on the order in which it receives messages from its sources.

The priority_buffer class receives messages of type std::tuple that contain PriorityType and Type elements. PriorityType refers to the type that holds the priority of each message; Type refers to the data portion of the message. The priority_buffer class sends messages of type Type. The priority_buffer class also manages two message queues: a std::priority_queue object for incoming messages and a std::queue object for outgoing messages. Ordering messages by priority is useful when a priority_buffer object receives multiple messages simultaneously or when it receives multiple messages before any messages are read by consumers.

In addition to the seven methods that a class that derives from propagator_block must implement, the priority_buffer class also overrides the link_target_notification and send_message methods. The priority_buffer class also defines two public helper methods, enqueue and dequeue, and a private helper method, propagate_priority_order.

The following procedure describes how to implement the priority_buffer class.

To define the priority_buffer class

  1. Create a C++ header file and name it priority_buffer.h. Alternatively, you can use an existing header file that is part of your project.

  2. In priority_buffer.h, add the following code.

    
    #pragma once
    #include <agents.h>
    #include <queue>
    
    
    
  3. In the std namespace, define specializations of std::less and std::greater that act on Concurrency::message objects.

    
    namespace std 
    {
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
    {  
       typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator< to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) < get<0>(right->payload));
       }
    };
    
    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
    {  
       typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator> to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) > get<0>(right->payload));
       }
    };
    
    }
    
    
    

    The priority_buffer class stores message objects in a priority_queue object. These type specializations enable the priority queue to sort messages according to their priority. The priority is the first element of the tuple object.

  4. In the Concurrency namespace, declare the priority_buffer class.

    
    namespace Concurrency 
    {
    template<class Type, 
             typename PriorityType = int,
             typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : 
       public propagator_block<multi_link_registry<ITarget<Type>>,
                               multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
    protected:
    private:
    };
    }
    
    
    

    The priority_buffer class derives from propagator_block. Therefore, it can both send and receive messages. The priority_buffer class can have multiple targets that receive messages of type Type. It can also have multiple sources that send messages of type tuple<PriorityType, Type>.

  5. In the private section of the priority_buffer class, add the following member variables.

    
    // Stores incoming messages. 
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
       message<_Source_type>*, 
       std::vector<message<_Source_type>*>, 
       Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<message<_Target_type>*> _output_messages;
    
    
    

    The priority_queue object holds incoming messages; the queue object holds outgoing messages. A priority_buffer object can receive multiple messages simultaneously; the critical_section object synchronizes access to the queue of input messages.

  6. In the private section, define the copy constructor and the assignment operator. This prevents priority_queue objects from being assignable.

    
    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
    
    
  7. In the public section, define the constructors that are common to many message block types. Also define the destructor.

    
    // Constructs a priority_buffer message block.
    priority_buffer() 
    {       
       initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
       initialize_source_and_target();
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler)
    {
       initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler, filter_method const& filter) 
    {
       initialize_source_and_target(&scheduler);
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group)
    {
       initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
    {
       initialize_source_and_target(NULL, &schedule_group);
       register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
       // Remove all links.
       remove_network_links();
    }
    
    
    
  8. In the public section, define the methods enqueue and dequeue. These helper methods provide an alternative way to send messages to and receive messages from a priority_buffer object.

    
    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
      return Concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
      return receive<Type>(this);
    }
    
    
    
  9. In the protected section, define the propagate_to_any_targets method.

    
    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(message<_Target_type>*)
    {
       // Retrieve the message from the front of the input queue.
       message<_Source_type>* input_message = NULL;
       {
          critical_section::scoped_lock lock(_input_lock);
          if (_input_messages.size() > 0)
          {
             input_message = _input_messages.top();
             _input_messages.pop();
          }
       }
    
       // Move the message to the output queue.
       if (input_message != NULL)
       {
          // The payload of the output message does not contain the 
          // priority of the message.
          message<_Target_type>* output_message = 
             new message<_Target_type>(get<1>(input_message->payload));
          _output_messages.push(output_message);
    
          // Free the memory for the input message.
          delete input_message;
    
          // Do not propagate messages if the new message is not the head message.
          // In this case, the head message is reserved by another message block.
          if (_output_messages.front()->msg_id() != output_message->msg_id())
          {
             return;
          }
       }
    
       // Propagate out the output messages.
       propagate_priority_order();
    }
    
    
    

    The propagate_to_any_targets method transfers the message that is at the front of the input queue to the output queue and propagates out all messages in the output queue.

  10. In the protected section, define the accept_message method.

    
    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
    {        
       message<_Target_type>* message = NULL;
    
       // Transfer ownership if the provided message identifier matches
       // the identifier of the front of the output message queue.
       if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
       {
          message = _output_messages.front();            
          _output_messages.pop();
       }
    
       return message;
    }
    
    
    

    When a target block calls the accept_message method, the priority_buffer class transfers ownership of the message to the first target block that accepts it. (This resembles the behavior of unbounded_buffer.)

  11. In the protected section, define the reserve_message method.

    
    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(runtime_object_identity msg_id)
    {
       // Allow the message to be reserved if the provided message identifier
       // is the message identifier of the front of the message queue.
       return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
    }
    
    
    

    The priority_buffer class permits a target block to reserve a message when the provided message identifier matches the identifier of the message that is at the front of the queue. In other words, a target can reserve the message if the priority_buffer object has not yet received an additional message and has not yet propagated out the current one.

  12. In the protected section, define the consume_message method.

    
    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual message<Type>* consume_message(runtime_object_identity msg_id)
    {
       // Transfer ownership of the message to the caller.
       return accept_message(msg_id);
    }
    
    
    

    A target block calls consume_message to transfer ownership of the message that it reserved.

  13. In the protected section, define the release_message method.

    
    // Releases a previous message reservation.
    virtual void release_message(runtime_object_identity msg_id)
    {
       // The head message must be the one that is reserved. 
       if (_output_messages.empty() || 
           _output_messages.front()->msg_id() != msg_id)
       {
          throw message_not_found();
       }
    }
    
    
    

    A target block calls release_message to cancel its reservation to a message.

  14. In the protected section, define the resume_propagation method.

    
    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
       // Propagate out any messages in the output queue.
       if (_output_messages.size() > 0)
       {
          async_send(NULL);
       }
    }
    
    
    

    The runtime calls resume_propagation after a target block either consumes or releases a reserved message. This method propagates out any messages that are in the output queue.

  15. In the protected section, define the link_target_notification method.

    
    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(ITarget<_Target_type>*)
    {
       // Do not propagate messages if a target block reserves
       // the message at the front of the queue.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out any messages that are in the output queue.
       propagate_priority_order();
    }
    
    
    

    The _M_pReservedFor member variable is defined by the base class, source_block. This member variable points to the target block, if any, that is holding a reservation to the message that is at the front of the output queue. The runtime calls link_target_notification when a new target is linked to the priority_buffer object. This method propagates out any messages that are in the output queue if no target is holding a reservation.

  16. In the private section, define the propagate_priority_order method.

    
    // Propagates messages in priority order.
    void propagate_priority_order()
    {
       // Cancel propagation if another block reserves the head message.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out all output messages. 
       // Because this block preserves message ordering, stop propagation
       // if any of the messages are not accepted by a target block.
       while (!_output_messages.empty())
       {
          // Get the next message.
          message<_Target_type> * message = _output_messages.front();
    
          message_status status = declined;
    
          // Traverse each target in the order in which they are connected.
          for (target_iterator iter = _M_connectedTargets.begin(); 
               *iter != NULL; 
               ++iter)
          {
             // Propagate the message to the target.
             ITarget<_Target_type>* target = *iter;
             status = target->propagate(message, this);
    
             // If the target accepts the message then ownership of message has 
             // changed. Do not propagate this message to any other target.
             if (status == accepted)
             {
                break;
             }
    
             // If the target only reserved this message, we must wait until the 
             // target accepts the message.
             if (_M_pReservedFor != NULL)
             {
                break;
             }
          }
    
          // If status is anything other than accepted, then the head message
          // was not propagated out. To preserve the order in which output 
          // messages are propagated, we must stop propagation until the head 
          // message is accepted.
          if (status != accepted)
          {
              break;
          }          
       }
    }
    
    
    

    This method propagates out all messages from the output queue. Every message in the queue is offered to every target block until one of the target blocks accepts the message. The priority_buffer class preserves the order of the outgoing messages. Therefore, the first message in the output queue must be accepted by a target block before this method offers any other message to the target blocks.

  17. In the protected section, define the propagate_message method.

    
    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual message_status propagate_message(message<_Source_type>* message, 
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Asynchronously send the message to the target blocks.
          async_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    
    
    

    The propagate_message method enables the priority_buffer class to act as a message receiver, or target. This method receives the message that is offered by the provided source block and inserts that message into the priority queue. The propagate_message method then asynchronously sends all output messages to the target blocks.

    The runtime calls this method when you call the Concurrency::asend function or when the message block is connected to other message blocks.

  18. In the protected section, define the send_message method.

    
    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual message_status send_message(message<_Source_type>* message,
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Synchronously send the message to the target blocks.
          sync_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    
    
    

    The send_message method resembles propagate_message. However it sends the output messages synchronously instead of asynchronously.

    The runtime calls this method during a synchronous send operation, such as when you call the Concurrency::send function.

The priority_buffer class contains constructor overloads that are typical in many message block types. Some constructor overloads take Concurrency::Scheduler or Concurrency::ScheduleGroup objects, which enable the message block to be managed by a specific task scheduler. Other constructor overloads take a filter function. Filter functions enable message blocks to accept or reject a message on the basis of its payload. For more information about message filters, see Asynchronous Message Blocks. For more information about task schedulers, see Task Scheduler (Concurrency Runtime).

Because the priority_buffer class orders messages by priority and then by the order in which messages are received, this class is most useful when it receives messages asynchronously, for example, when you call the Concurrency::asend function or when the message block is connected to other message blocks.

[go to top]

The following example shows the complete definition of the priority_buffer class.


// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
// A specialization of less that tests whether the priority element of a 
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
{  
   typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator< to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) < get<0>(right->payload));
   }
};

// A specialization of less that tests whether the priority element of a 
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
{  
   typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator> to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) > get<0>(right->payload));
   }
};

}

namespace Concurrency 
{
// A message block type that orders incoming messages first by priority, 
// and then by the order in which messages are received. 
template<class Type, 
         typename PriorityType = int,
         typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : 
   public propagator_block<multi_link_registry<ITarget<Type>>,
                           multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{  
public:
   // Constructs a priority_buffer message block.
   priority_buffer() 
   {       
      initialize_source_and_target();
   }

   // Constructs a priority_buffer message block with the given filter function.
   priority_buffer(filter_method const& filter)
   {
      initialize_source_and_target();
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler)
   {
      initialize_source_and_target(&scheduler);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler, filter_method const& filter) 
   {
      initialize_source_and_target(&scheduler);
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group)
   {
      initialize_source_and_target(NULL, &schedule_group);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
   {
      initialize_source_and_target(NULL, &schedule_group);
      register_filter(filter);
   }

   // Destroys the message block.
   ~priority_buffer()
   {
      // Remove all links.
      remove_network_links();
   }

   // Sends an item to the message block.
   bool enqueue(Type const& item)
   {
     return Concurrency::asend<Type>(this, item);
   }

   // Receives an item from the message block.
   Type dequeue()
   {
     return receive<Type>(this);
   }

protected:
   // Asynchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::propagate.
   virtual message_status propagate_message(message<_Source_type>* message, 
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Asynchronously send the message to the target blocks.
         async_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Synchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::send.
   virtual message_status send_message(message<_Source_type>* message,
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Synchronously send the message to the target blocks.
         sync_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Accepts a message that was offered by this block by transferring ownership
   // to the caller.
   virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
   {        
      message<_Target_type>* message = NULL;

      // Transfer ownership if the provided message identifier matches
      // the identifier of the front of the output message queue.
      if (!_output_messages.empty() && 
           _output_messages.front()->msg_id() == msg_id)
      {
         message = _output_messages.front();            
         _output_messages.pop();
      }

      return message;
   }

   // Reserves a message that was previously offered by this block.
   virtual bool reserve_message(runtime_object_identity msg_id)
   {
      // Allow the message to be reserved if the provided message identifier
      // is the message identifier of the front of the message queue.
      return (!_output_messages.empty() && 
               _output_messages.front()->msg_id() == msg_id);
   }

   // Transfers the message that was previously offered by this block 
   // to the caller. The caller of this method is the target block that 
   // reserved the message.
   virtual message<Type>* consume_message(runtime_object_identity msg_id)
   {
      // Transfer ownership of the message to the caller.
      return accept_message(msg_id);
   }

   // Releases a previous message reservation.
   virtual void release_message(runtime_object_identity msg_id)
   {
      // The head message must be the one that is reserved. 
      if (_output_messages.empty() || 
          _output_messages.front()->msg_id() != msg_id)
      {
         throw message_not_found();
      }
   }

   // Resumes propagation after a reservation has been released.
   virtual void resume_propagation()
   {
      // Propagate out any messages in the output queue.
      if (_output_messages.size() > 0)
      {
         async_send(NULL);
      }
   }

   // Notifies this block that a new target has been linked to it.
   virtual void link_target_notification(ITarget<_Target_type>*)
   {
      // Do not propagate messages if a target block reserves
      // the message at the front of the queue.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out any messages that are in the output queue.
      propagate_priority_order();
   }

   // Transfers the message at the front of the input queue to the output queue
   // and propagates out all messages in the output queue.
   virtual void propagate_to_any_targets(message<_Target_type>*)
   {
      // Retrieve the message from the front of the input queue.
      message<_Source_type>* input_message = NULL;
      {
         critical_section::scoped_lock lock(_input_lock);
         if (_input_messages.size() > 0)
         {
            input_message = _input_messages.top();
            _input_messages.pop();
         }
      }

      // Move the message to the output queue.
      if (input_message != NULL)
      {
         // The payload of the output message does not contain the 
         // priority of the message.
         message<_Target_type>* output_message = 
            new message<_Target_type>(get<1>(input_message->payload));
         _output_messages.push(output_message);

         // Free the memory for the input message.
         delete input_message;

         // Do not propagate messages if the new message is not the head message.
         // In this case, the head message is reserved by another message block.
         if (_output_messages.front()->msg_id() != output_message->msg_id())
         {
            return;
         }
      }

      // Propagate out the output messages.
      propagate_priority_order();
   }

private:

   // Propagates messages in priority order.
   void propagate_priority_order()
   {
      // Cancel propagation if another block reserves the head message.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out all output messages. 
      // Because this block preserves message ordering, stop propagation
      // if any of the messages are not accepted by a target block.
      while (!_output_messages.empty())
      {
         // Get the next message.
         message<_Target_type> * message = _output_messages.front();

         message_status status = declined;

         // Traverse each target in the order in which they are connected.
         for (target_iterator iter = _M_connectedTargets.begin(); 
              *iter != NULL; 
              ++iter)
         {
            // Propagate the message to the target.
            ITarget<_Target_type>* target = *iter;
            status = target->propagate(message, this);

            // If the target accepts the message then ownership of message has 
            // changed. Do not propagate this message to any other target.
            if (status == accepted)
            {
               break;
            }

            // If the target only reserved this message, we must wait until the 
            // target accepts the message.
            if (_M_pReservedFor != NULL)
            {
               break;
            }
         }

         // If status is anything other than accepted, then the head message
         // was not propagated out. To preserve the order in which output 
         // messages are propagated, we must stop propagation until the head 
         // message is accepted.
         if (status != accepted)
         {
             break;
         }          
      }
   }

private:

   // Stores incoming messages. 
   // The type parameter Pr specifies how to order messages by priority.
   std::priority_queue<
      message<_Source_type>*, 
      std::vector<message<_Source_type>*>, 
      Pr
   > _input_messages;

   // Synchronizes access to the input message queue.
   critical_section _input_lock;

   // Stores outgoing messages.
   std::queue<message<_Target_type>*> _output_messages;

private:
   // Hide assignment operator and copy constructor.
   priority_buffer const &operator =(priority_buffer const&);
   priority_buffer(priority_buffer const &);
};

}


The following example concurrently performs a number of asend and Concurrency::receive operations on a priority_buffer object.


// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}


This example produces the following sample output.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

The priority_buffer class orders messages first by priority and then by the order in which it receives messages. In this example, messages with greater numerical priority are inserted towards the front of the queue.

[go to top]

Copy the example code and paste it in a Visual Studio project, or paste the definition of the priority_buffer class in a file that is named priority_buffer.h and the test program in a file that is named priority_buffer.cpp and then run the following command in a Visual Studio 2010 Command Prompt window.

cl.exe /EHsc priority_buffer.cpp

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback

Community Additions

ADD
Show:
© 2014 Microsoft