Пошаговое руководство. Создание пользовательского блока сообщений

В этом документе описано, как создать пользовательский тип блока сообщений, сортирующий входящие сообщения по приоритету.

Несмотря на то, что встроенные типы блоков сообщений предоставляют широкий набор функций, можно создавать собственные типы блоков сообщений и настраивать их в соответствии с требованиями своего приложения. Описание встроенных типов блоков сообщений, входящих в библиотеку асинхронных агентов, см. в разделе Асинхронные блоки сообщений.

Обязательные компоненты

Прежде чем начать выполнение этого пошагового руководства, необходимо ознакомиться со следующими документами.

Подразделы

Это пошаговое руководство содержит следующие подразделы.

  • Создание пользовательского блока сообщений

  • Определение класса priority_buffer

  • Полный код примера

Создание пользовательского блока сообщений

Блоки сообщений принимают участие в отправке и получении сообщений. Блок сообщений, отправляющий сообщения, называется исходным блоком. Блок сообщений, принимающий сообщения, называется целевым блоком. Блок сообщений, отправляющий и принимающий сообщения, называется блоком распространения. В библиотеке агентов исходные блоки представлены абстрактным классом Concurrency::ISource, а целевые блоки — абстрактным классом Concurrency::ITarget. Типы блоков сообщений, выполняющие роль источников, являются производными от класса ISource; типы блоков сообщений, выполняющие роль целевых объектов, являются производными от класса ITarget.

Хотя тип блока сообщений можно наследовать непосредственно от классов ISource и ITarget, библиотека агентов определяет три базовых класса, выполняющих большинство общих функций типов блоков сообщений, например обработку ошибок и соединение блоков сообщений безопасным в режиме параллелизма способом. Класс Concurrency::source_block наследуется от класса ISource и отправляет сообщения другим блокам. Класс Concurrency::target_block наследуется от класса ITarget и принимает сообщения от других блоков. Класс Concurrency::propagator_block наследуется от классов ISource и ITarget, отправляет сообщения другим блокам и принимает сообщения от других блоков. Рекомендуется обеспечивать инфраструктуру с помощью этих трех базовых классов, чтобы заниматься в первую очередь поведением своего блока сообщений.

Классы source_block, target_block и propagator_block — это шаблоны, параметризованные в типе, управляющем соединениями, или связями, между исходными и целевыми блоками, и в типе, управляющем обработкой сообщений. Библиотека агентов определяет два типа, выполняющих управление связями: Concurrency::single_link_registry и Concurrency::multi_link_registry. Класс single_link_registry позволяет связывать блок сообщений с одним исходным или целевым блоком. Класс multi_link_registry позволяет связывать блок сообщений с несколькими исходными или целевыми блоками. Библиотека агентов определяет один класс, управляющий сообщениями: Concurrency::ordered_message_processor. Класс ordered_message_processor позволяет блокам сообщений обрабатывать сообщения в порядке получения.

Чтобы лучше понять, как блоки сообщений соотносятся со своими исходными и целевыми блоками, рассмотрим следующий пример. В этом примере показано объявление класса Concurrency::transformer.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Класс transformer наследуется от класса propagator_block, поэтому он действует и как исходный, и как целевой блок. Он принимает сообщения типа _Input и отправляет сообщения типа _Output. Класс transformer задает single_link_registry как средство управления связями для любых целевых блоков, а multi_link_registry — как средство управления для любых исходных блоков. Поэтому объект transformer может иметь не более одного целевого блока и любое количество исходных блоков.

Класс, наследуемый от source_block, должен реализовывать шесть методов: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message и resume_propagation. Класс, наследуемый от target_block, должен реализовывать метод propagate_message, а также может реализовывать метод send_message (необязательно). Наследование от propagator_block эквивалентно по функции наследованию одновременно от source_block и target_block.

Метод propagate_to_any_targets вызывается средой выполнения, чтобы асинхронно или синхронно обрабатывать входящие сообщения и распространять исходящие сообщения. Метод accept_message вызывается целевыми блоками, чтобы принимать сообщения. Многие типы блоков сообщений, например unbounded_buffer, отправляют сообщения только первому целевому блоку, который готов их принимать. Поэтому он передает владение сообщением целевому блоку. Другие типы блоков сообщений, например Concurrency::overwrite_buffer, предлагают сообщения всем своим целевым блокам. Поэтому overwrite_buffer создает копию сообщения для всех целевых блоков.

Методы reserve_message, consume_message, release_message и resume_propagation позволяют блокам сообщений принимать участие в резервировании сообщений. Целевые блоки вызывают метод reserve_message, если они получают сообщение, которое нужно зарезервировать для использования впоследствии. После резервирования сообщения целевой блок может вызвать метод consume_message, чтобы употребить это сообщение, или метод release_message, чтобы отменить резервирование. Как и в случае метода accept_message, реализация consume_message может передать владение сообщением или вернуть копию сообщения. После того, как целевой блок употребит или высвободит зарезервированное сообщение, среда выполнения вызывает метод resume_propagation. Обычно этот метод продолжает распространение сообщения, начиная со следующего сообщения в очереди.

Среда выполнения вызывает метод propagate_message, чтобы асинхронно передать сообщение текущему блоку от другого блока. Метод send_message похож на метод propagate_message, но он отправляет сообщения целевым блокам синхронно (не асинхронно). Реализация метода send_message по умолчанию отклоняет все входящие сообщения. Среда выполнения не вызывает эти методы, если сообщение не проходит необязательную функцию фильтрации, связанную с целевым блоком. Дополнительные сведения об фильтрах сообщений см. в разделе Асинхронные блоки сообщений.

[в начало]

Определение класса priority_buffer

Класс priority_buffer — это пользовательский тип блоков сообщений, сортирующий входящие сообщения сначала по приоритету, а затем — по порядку получения. Класс priority_buffer похож на класс Concurrency::unbounded_buffer, так как он содержит очередь сообщений, действует одновременно как исходный и целевой блок сообщений и может иметь несколько исходных и целевых объектов. Однако класс unbounded_buffer распространяет сообщения только в порядке получения от исходных объектов.

Класс priority_buffer получает сообщения типа std::tuple, содержащие элементы PriorityType and Type. PriorityType относится к типу, представляющему приоритет сообщения; Type относится к части сообщения, содержащей данные. Класс priority_buffer отправляет сообщения типа Type. Класс priority_buffer также управляет двумя очередями сообщений: объектом std::priority_queue для входящих сообщений и объектом std::queue для исходящих сообщений. Сортировка сообщений по приоритету полезна, когда объект priority_buffer принимает одновременно несколько сообщений или когда он принимает несколько сообщений до того, как сообщения будут прочитаны получателями.

Кроме семи методов, которые должен реализовывать класс, наследуемый от класса propagator_block, класс priority_buffer также переопределяет методы link_target_notification и send_message. Класс priority_buffer также определяет два открытых вспомогательных метода, enqueue и dequeue, и закрытый вспомогательный метод propagate_priority_order.

В описанной ниже процедуре реализуется класс priority_buffer.

Определение класса priority_buffer

  1. Создайте файл заголовка C++ и присвойте ему имя priority_buffer.h. Можно использовать существующий файл заголовка, входящий в проект.

  2. Добавьте следующий код в файл priority_buffer.h.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. В пространстве имен std определите специализации структур std::less и std::greater, работающие с объектами Concurrency::message.

    namespace std 
    {
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
    {  
       typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator< to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) < get<0>(right->payload));
       }
    };
    
    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
    {  
       typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator> to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) > get<0>(right->payload));
       }
    };
    
    }
    

    Класс priority_buffer сохраняет объекты message в объекте priority_queue. Эти специализации типов позволяют очереди приоритета сортировать сообщения по приоритету. Приоритет — первый элемент объекта tuple.

  4. В пространстве имен Concurrency объявите класс priority_buffer.

    namespace Concurrency 
    {
    template<class Type, 
             typename PriorityType = int,
             typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : 
       public propagator_block<multi_link_registry<ITarget<Type>>,
                               multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
    protected:
    private:
    };
    }
    

    Класс priority_buffer наследуется от propagator_block. Поэтому он может и отправлять, и принимать сообщения. Класс priority_buffer может иметь несколько целевых объектов, принимающих сообщения типа Type. Он также может иметь несколько исходных объектов, отправляющих сообщения типа tuple<PriorityType, Type>.

  5. Добавьте следующие переменные-члены в раздел private класса priority_buffer.

    // Stores incoming messages. 
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
       message<_Source_type>*, 
       std::vector<message<_Source_type>*>, 
       Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<message<_Target_type>*> _output_messages;
    

    Объект priority_queue содержит входящие сообщения; объект queue содержит исходящие сообщения. Объект priority_buffer может одновременно принимать несколько сообщений; объект critical_section синхронизирует доступ к очереди входящих сообщений.

  6. Определите конструктор копий и оператор назначения в разделе private. Это не позволяет назначать объекты priority_queue.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. В разделе public определите конструкторы, являющиеся общими для различных типов блоков сообщений. Также определите деструктор.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {       
       initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
       initialize_source_and_target();
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler)
    {
       initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler, filter_method const& filter) 
    {
       initialize_source_and_target(&scheduler);
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group)
    {
       initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
    {
       initialize_source_and_target(NULL, &schedule_group);
       register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
       // Remove all links.
       remove_network_links();
    }
    
  8. В разделе public определите методы enqueue и dequeue. Эти вспомогательные методы предоставляют дополнительный способ отправлять сообщения объекту priority_buffer и принимать от него сообщения.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
      return Concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
      return receive<Type>(this);
    }
    
  9. В разделе protected определите метод propagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(message<_Target_type>*)
    {
       // Retrieve the message from the front of the input queue.
       message<_Source_type>* input_message = NULL;
       {
          critical_section::scoped_lock lock(_input_lock);
          if (_input_messages.size() > 0)
          {
             input_message = _input_messages.top();
             _input_messages.pop();
          }
       }
    
       // Move the message to the output queue.
       if (input_message != NULL)
       {
          // The payload of the output message does not contain the 
          // priority of the message.
          message<_Target_type>* output_message = 
             new message<_Target_type>(get<1>(input_message->payload));
          _output_messages.push(output_message);
    
          // Free the memory for the input message.
          delete input_message;
    
          // Do not propagate messages if the new message is not the head message.
          // In this case, the head message is reserved by another message block.
          if (_output_messages.front()->msg_id() != output_message->msg_id())
          {
             return;
          }
       }
    
       // Propagate out the output messages.
       propagate_priority_order();
    }
    

    Метод propagate_to_any_targets передает сообщение, находящееся первым во входящей очереди, в исходящую очередь и распространяет все сообщения в исходящей очереди.

  10. В разделе protected определите метод accept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
    {        
       message<_Target_type>* message = NULL;
    
       // Transfer ownership if the provided message identifier matches
       // the identifier of the front of the output message queue.
       if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
       {
          message = _output_messages.front();            
          _output_messages.pop();
       }
    
       return message;
    }
    

    Когда целевой блок вызывает метод accept_message, класс priority_buffer передает владение сообщением первому целевому блоку, принимающему его. (Это похоже на поведение класса unbounded_buffer).

  11. В разделе protected определите метод reserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(runtime_object_identity msg_id)
    {
       // Allow the message to be reserved if the provided message identifier
       // is the message identifier of the front of the message queue.
       return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
    }
    

    Класс priority_buffer позволяет целевому блоку резервировать сообщение, если предоставленный идентификатор сообщения совпадает с идентификатором первого в очереди сообщения. Иными словами, целевой блок может зарезервировать сообщение, если объект priority_buffer еще не получил следующее сообщение и не распространил текущее.

  12. В разделе protected определите метод consume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual message<Type>* consume_message(runtime_object_identity msg_id)
    {
       // Transfer ownership of the message to the caller.
       return accept_message(msg_id);
    }
    

    Целевой блок вызывает метод consume_message, чтобы передать владение зарезервированным сообщением.

  13. В разделе protected определите метод release_message.

    // Releases a previous message reservation.
    virtual void release_message(runtime_object_identity msg_id)
    {
       // The head message must be the one that is reserved. 
       if (_output_messages.empty() || 
           _output_messages.front()->msg_id() != msg_id)
       {
          throw message_not_found();
       }
    }
    

    Целевой блок вызывает метод release_message, чтобы отменить резервирование сообщения.

  14. В разделе protected определите метод resume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
       // Propagate out any messages in the output queue.
       if (_output_messages.size() > 0)
       {
          async_send(NULL);
       }
    }
    

    После того, как целевой блок употребит или высвободит зарезервированное сообщение, среда выполнения вызывает метод resume_propagation. Этот метод распространяет все сообщения в исходящей очереди.

  15. В разделе protected определите метод link_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(ITarget<_Target_type>*)
    {
       // Do not propagate messages if a target block reserves
       // the message at the front of the queue.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out any messages that are in the output queue.
       propagate_priority_order();
    }
    

    Переменная-член _M_pReservedFor определяется базовым классом source_block. Эта переменная-член указывает на целевой блок, если он существует, осуществляющий резервирование первого в исходящей очереди сообщения. Когда с объектом priority_buffer связывается новый целевой объект, среда выполнения вызывает метод link_target_notification. Этот метод распространяет все сообщения в исходящей очереди, если нет целевых объектов, осуществляющих резервирование.

  16. В разделе private определите метод propagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
       // Cancel propagation if another block reserves the head message.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out all output messages. 
       // Because this block preserves message ordering, stop propagation
       // if any of the messages are not accepted by a target block.
       while (!_output_messages.empty())
       {
          // Get the next message.
          message<_Target_type> * message = _output_messages.front();
    
          message_status status = declined;
    
          // Traverse each target in the order in which they are connected.
          for (target_iterator iter = _M_connectedTargets.begin(); 
               *iter != NULL; 
               ++iter)
          {
             // Propagate the message to the target.
             ITarget<_Target_type>* target = *iter;
             status = target->propagate(message, this);
    
             // If the target accepts the message then ownership of message has 
             // changed. Do not propagate this message to any other target.
             if (status == accepted)
             {
                break;
             }
    
             // If the target only reserved this message, we must wait until the 
             // target accepts the message.
             if (_M_pReservedFor != NULL)
             {
                break;
             }
          }
    
          // If status is anything other than accepted, then the head message
          // was not propagated out. To preserve the order in which output 
          // messages are propagated, we must stop propagation until the head 
          // message is accepted.
          if (status != accepted)
          {
              break;
          }          
       }
    }
    

    Этот метод распространяет все сообщения в исходящей очереди. Все сообщения в очереди предлагаются всем целевым блокам, пока один из них не примет сообщение. Класс priority_buffer сохраняет порядок исходящих сообщений. Поэтому первое сообщение в исходящей очереди должно быть принято целевым блоком до того, как этот метод предложит любое другое сообщение целевым блокам.

  17. В разделе protected определите метод propagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual message_status propagate_message(message<_Source_type>* message, 
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Asynchronously send the message to the target blocks.
          async_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    Метод propagate_message позволяет классу priority_buffer служить приемником сообщений или целевым объектом. Этот метод принимает сообщение, предложенное предоставленным исходным блоком, и вставляет его в очередь приоритета. Затем метод propagate_message асинхронно отправляет все исходящие сообщения целевым блокам.

    Среда выполнения вызывает этот метод при вызове функции Concurrency::asend или когда блок сообщений соединяется с другими блоками сообщений.

  18. В разделе protected определите метод send_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual message_status send_message(message<_Source_type>* message,
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Synchronously send the message to the target blocks.
          sync_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    Метод send_message похож на метод propagate_message. Однако он отправляет исходящие сообщения синхронно (не асинхронно).

    Среда выполнения вызывает этот метод во время операции синхронной отправки, например при вызове функции Concurrency::send.

Класс priority_buffer содержит перегрузки конструкторов, обычные для многих типов блоков сообщений. Некоторые перегрузки конструкторов принимают объекты Concurrency::Scheduler или Concurrency::ScheduleGroup, позволяющие определенному планировщику заданий управлять блоком сообщений. Другие перегрузки конструкторов принимают функцию фильтрации. Функции фильтрации позволяют блокам сообщений принимать или отклонять сообщения в зависимости от их полезной нагрузки. Дополнительные сведения об фильтрах сообщений см. в разделе Асинхронные блоки сообщений. Дополнительные сведения о планировщиках заданий см. в разделе Планировщик задач (среда выполнения с параллелизмом).

Так как класс priority_buffer сортирует сообщения сначала по приоритету, а затем — по порядку получения, этот класс особенно полезен при асинхронном получении сообщений, например при вызове функции Concurrency::asend или при соединении блока сообщений с другими блоками сообщений.

[в начало]

Полный код примера

В следующем примере показано полное определение класса priority_buffer.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
// A specialization of less that tests whether the priority element of a 
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
{  
   typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator< to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) < get<0>(right->payload));
   }
};

// A specialization of less that tests whether the priority element of a 
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
{  
   typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator> to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) > get<0>(right->payload));
   }
};

}

namespace Concurrency 
{
// A message block type that orders incoming messages first by priority, 
// and then by the order in which messages are received. 
template<class Type, 
         typename PriorityType = int,
         typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : 
   public propagator_block<multi_link_registry<ITarget<Type>>,
                           multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{  
public:
   // Constructs a priority_buffer message block.
   priority_buffer() 
   {       
      initialize_source_and_target();
   }

   // Constructs a priority_buffer message block with the given filter function.
   priority_buffer(filter_method const& filter)
   {
      initialize_source_and_target();
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler)
   {
      initialize_source_and_target(&scheduler);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler, filter_method const& filter) 
   {
      initialize_source_and_target(&scheduler);
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group)
   {
      initialize_source_and_target(NULL, &schedule_group);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
   {
      initialize_source_and_target(NULL, &schedule_group);
      register_filter(filter);
   }

   // Destroys the message block.
   ~priority_buffer()
   {
      // Remove all links.
      remove_network_links();
   }

   // Sends an item to the message block.
   bool enqueue(Type const& item)
   {
     return Concurrency::asend<Type>(this, item);
   }

   // Receives an item from the message block.
   Type dequeue()
   {
     return receive<Type>(this);
   }

protected:
   // Asynchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::propagate.
   virtual message_status propagate_message(message<_Source_type>* message, 
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Asynchronously send the message to the target blocks.
         async_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Synchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::send.
   virtual message_status send_message(message<_Source_type>* message,
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Synchronously send the message to the target blocks.
         sync_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Accepts a message that was offered by this block by transferring ownership
   // to the caller.
   virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
   {        
      message<_Target_type>* message = NULL;

      // Transfer ownership if the provided message identifier matches
      // the identifier of the front of the output message queue.
      if (!_output_messages.empty() && 
           _output_messages.front()->msg_id() == msg_id)
      {
         message = _output_messages.front();            
         _output_messages.pop();
      }

      return message;
   }

   // Reserves a message that was previously offered by this block.
   virtual bool reserve_message(runtime_object_identity msg_id)
   {
      // Allow the message to be reserved if the provided message identifier
      // is the message identifier of the front of the message queue.
      return (!_output_messages.empty() && 
               _output_messages.front()->msg_id() == msg_id);
   }

   // Transfers the message that was previously offered by this block 
   // to the caller. The caller of this method is the target block that 
   // reserved the message.
   virtual message<Type>* consume_message(runtime_object_identity msg_id)
   {
      // Transfer ownership of the message to the caller.
      return accept_message(msg_id);
   }

   // Releases a previous message reservation.
   virtual void release_message(runtime_object_identity msg_id)
   {
      // The head message must be the one that is reserved. 
      if (_output_messages.empty() || 
          _output_messages.front()->msg_id() != msg_id)
      {
         throw message_not_found();
      }
   }

   // Resumes propagation after a reservation has been released.
   virtual void resume_propagation()
   {
      // Propagate out any messages in the output queue.
      if (_output_messages.size() > 0)
      {
         async_send(NULL);
      }
   }

   // Notifies this block that a new target has been linked to it.
   virtual void link_target_notification(ITarget<_Target_type>*)
   {
      // Do not propagate messages if a target block reserves
      // the message at the front of the queue.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out any messages that are in the output queue.
      propagate_priority_order();
   }

   // Transfers the message at the front of the input queue to the output queue
   // and propagates out all messages in the output queue.
   virtual void propagate_to_any_targets(message<_Target_type>*)
   {
      // Retrieve the message from the front of the input queue.
      message<_Source_type>* input_message = NULL;
      {
         critical_section::scoped_lock lock(_input_lock);
         if (_input_messages.size() > 0)
         {
            input_message = _input_messages.top();
            _input_messages.pop();
         }
      }

      // Move the message to the output queue.
      if (input_message != NULL)
      {
         // The payload of the output message does not contain the 
         // priority of the message.
         message<_Target_type>* output_message = 
            new message<_Target_type>(get<1>(input_message->payload));
         _output_messages.push(output_message);

         // Free the memory for the input message.
         delete input_message;

         // Do not propagate messages if the new message is not the head message.
         // In this case, the head message is reserved by another message block.
         if (_output_messages.front()->msg_id() != output_message->msg_id())
         {
            return;
         }
      }

      // Propagate out the output messages.
      propagate_priority_order();
   }

private:

   // Propagates messages in priority order.
   void propagate_priority_order()
   {
      // Cancel propagation if another block reserves the head message.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out all output messages. 
      // Because this block preserves message ordering, stop propagation
      // if any of the messages are not accepted by a target block.
      while (!_output_messages.empty())
      {
         // Get the next message.
         message<_Target_type> * message = _output_messages.front();

         message_status status = declined;

         // Traverse each target in the order in which they are connected.
         for (target_iterator iter = _M_connectedTargets.begin(); 
              *iter != NULL; 
              ++iter)
         {
            // Propagate the message to the target.
            ITarget<_Target_type>* target = *iter;
            status = target->propagate(message, this);

            // If the target accepts the message then ownership of message has 
            // changed. Do not propagate this message to any other target.
            if (status == accepted)
            {
               break;
            }

            // If the target only reserved this message, we must wait until the 
            // target accepts the message.
            if (_M_pReservedFor != NULL)
            {
               break;
            }
         }

         // If status is anything other than accepted, then the head message
         // was not propagated out. To preserve the order in which output 
         // messages are propagated, we must stop propagation until the head 
         // message is accepted.
         if (status != accepted)
         {
             break;
         }          
      }
   }

private:

   // Stores incoming messages. 
   // The type parameter Pr specifies how to order messages by priority.
   std::priority_queue<
      message<_Source_type>*, 
      std::vector<message<_Source_type>*>, 
      Pr
   > _input_messages;

   // Synchronizes access to the input message queue.
   critical_section _input_lock;

   // Stores outgoing messages.
   std::queue<message<_Target_type>*> _output_messages;

private:
   // Hide assignment operator and copy constructor.
   priority_buffer const &operator =(priority_buffer const&);
   priority_buffer(priority_buffer const &);
};

}

В следующем примере параллельно выполняется несколько операций asend и Concurrency::receive с объектом priority_buffer.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

В данном примере получается следующий результат.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

Класс priority_buffer сортирует сообщения сначала по приоритету, а затем — по порядку получения. В этом примере сообщения с более высоким числовым приоритетом попадают в начало очереди.

[в начало]

Компиляция кода

Скопируйте код примера и вставьте его в проект Visual Studio или скопируйте определение класса priority_buffer в файл с именем priority_buffer.h, а тестовую программу — в файл с именем priority_buffer.cpp, а затем выполните в окне командной строки Visual Studio 2010 следующую команду.

cl.exe /EHsc priority_buffer.cpp

См. также

Основные понятия

Пошаговые руководства по среде выполнения с параллелизмом

Асинхронные блоки сообщений

Функции передачи сообщений