チュートリアル: カスタム メッセージ ブロックの作成

ここでは、受信メッセージを優先順位に従って並べるカスタム メッセージ ブロックの型を作成する方法について説明します。

組み込みのメッセージ ブロックの型には幅広い機能が備わっていますが、独自のメッセージ ブロックの型を作成して、アプリケーションの要件を満たすようにカスタマイズすることもできます。 非同期エージェント ライブラリに用意されている組み込みのメッセージ ブロックの型については、「非同期メッセージ ブロック」を参照してください。

必須コンポーネント

このチュートリアルを開始する前に、次のドキュメントを参照してください。

セクション

このチュートリアルは、次のセクションで構成されています。

  • カスタム メッセージ ブロックのデザイン

  • priority_buffer クラスの定義

  • 完全な例

カスタム メッセージ ブロックのデザイン

メッセージ ブロックは、メッセージの送受信処理に参加します。 メッセージを送信するメッセージ ブロックはソース ブロックと呼ばれます。 メッセージを受信するメッセージ ブロックはターゲット ブロックと呼ばれます。 メッセージを送受信するメッセージ ブロックは伝達子ブロックと呼ばれます。 エージェント ライブラリは、Concurrency::ISource 抽象クラスを使用してソース ブロックを表し、Concurrency::ITarget 抽象クラスを使用してターゲット ブロックを表します。 ソースとして機能するメッセージ ブロックの型は ISource から派生します。ターゲットとして機能するメッセージ ブロックの型は ITarget から派生します。

メッセージ ブロックの型は ISource および ITarget から直接派生させることもできますが、エージェント ライブラリには、メッセージ ブロックのすべての型に共通の大部分の機能を実行する 3 つの基本クラスが定義されています。これらの基本クラスによって、エラーの処理やメッセージ ブロックの接続などの操作が同時実行セーフに行われます。 Concurrency::source_block クラスは ISource から派生し、メッセージを他のブロックに送信します。 Concurrency::target_block クラスは ITarget から派生し、他のブロックからメッセージを受信します。 Concurrency::propagator_block クラスは ISource および ITarget から派生し、他のブロックとの間でメッセージを送受信します。 メッセージ ブロックの動作に焦点を合わせることができるように、インフラストラクチャの細部の処理にはこれらの 3 つの基本クラスを使用することをお勧めします。

source_blocktarget_block、および propagator_block の各クラスはテンプレートであり、ソース ブロックとターゲット ブロック間の接続 (リンク) を管理する型、およびメッセージの処理方法を管理する型でパラメーター化されます。 エージェント ライブラリには、リンクの管理を行う 2 つの型 Concurrency::single_link_registry および Concurrency::multi_link_registry が定義されています。 single_link_registry クラスは、メッセージ ブロックを 1 つのソースまたは 1 つのターゲットにリンクできるようにします。 multi_link_registry クラスは、メッセージ ブロックを複数のソースまたは複数のターゲットにリンクできるようにします。 エージェント ライブラリには、メッセージの管理を行う 1 つのクラス Concurrency::ordered_message_processor が定義されています。 ordered_message_processor クラスは、メッセージ ブロックでメッセージを受信順に処理できるようにします。

メッセージ ブロックとソースおよびターゲットとの相互関係をより深く理解できるように、次の例を考えてみてください。 この例は、Concurrency::transformer クラスの宣言を示しています。

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

transformer クラスは propagator_block から派生するため、ソース ブロックとしてもターゲット ブロックとしても機能します。 _Input 型のメッセージを受け入れ、_Output 型のメッセージを送信します。 transformer クラスは、single_link_registry をターゲット ブロックのリンク マネージャーとして指定し、multi_link_registry をソース ブロックのリンク マネージャーとして指定します。 したがって、transformer オブジェクトで許容されるターゲットは 1 つだけですが、ソースの数に制限はありません。

source_block から派生するクラスは、propagate_to_any_targetsaccept_messagereserve_messageconsume_messagerelease_message、および resume_propagation の 6 つのメソッドを実装する必要があります。 target_block から派生するクラスは、propagate_message メソッドを実装する必要があり、必要に応じて send_message メソッドを実装できます。 propagator_block からの派生は、source_block および target_block からの派生と機能的には同等です。

propagate_to_any_targets メソッドは、受信メッセージを非同期的または同期的に処理し、送信メッセージを伝達するためにランタイムによって呼び出されます。 accept_message メソッドは、メッセージを受け入れるためにターゲット ブロックによって呼び出されます。 unbounded_buffer などのメッセージ ブロックの型の多くは、最初にメッセージを受信するターゲットにのみメッセージを送信します。 したがって、メッセージの所有権はそのターゲットに譲渡されます。 Concurrency::overwrite_buffer などの他のメッセージ ブロックの型は、メッセージを各ターゲット ブロックに提供します。 したがって、overwrite_buffer は各ターゲット用にメッセージのコピーを作成します。

reserve_messageconsume_messagerelease_message、および resume_propagation メソッドは、メッセージ ブロックがメッセージの予約に参加できるようにします。 ターゲット ブロックは、提供されたメッセージを予約して後で使用できるようにする場合に、reserve_message メソッドを呼び出します。 メッセージを受信したターゲット ブロックは、consume_message メソッドを呼び出してそのメッセージを処理するか、release_message メソッドを呼び出して予約を取り消すことができます。 accept_message メソッドと同様に、consume_message の実装では、メッセージの所有権を譲渡するか、メッセージのコピーを返すことができます。 ターゲット ブロックが予約済みのメッセージを処理または解放すると、ランタイムは resume_propagation メソッドを呼び出します。 通常、このメソッドは、キュー内の次のメッセージからメッセージ伝達を続行します。

ランタイムは、propagate_message メソッドを呼び出して、別のブロックから現在のブロックにメッセージを非同期的に転送します。 send_message メソッドは、非同期的にではなく同期的にメッセージをターゲット ブロックに送信する点を除いて、propagate_message と似ています。 send_message の既定の実装では、すべての受信メッセージが拒否されます。 ターゲット ブロックに関連付けられているオプションのフィルター関数をメッセージが通過しない場合、ランタイムはどちらのメソッドも呼び出しません。 メッセージ フィルターの詳細については、「非同期メッセージ ブロック」を参照してください。

[ページのトップへ]

priority_buffer クラスの定義

priority_buffer クラスは、受信メッセージを優先順位に従って並べてから、メッセージの受信順に並べるカスタム メッセージ ブロックの型です。 priority_buffer クラスは、メッセージのキューを保持するという点と、ソース メッセージ ブロックとしてもターゲット メッセージ ブロックとしても機能し、どちらの場合も複数のソースおよび複数のターゲットが許容されるという点で、Concurrency::unbounded_buffer クラスと似ています。 ただし、unbounded_buffer でのメッセージの伝達順は、必ずソースからのメッセージの受信順になります。

priority_buffer クラスは、PriorityType 要素と Type 要素を含む std::tuple 型のメッセージを受信します。 PriorityType は各メッセージの優先順位を保持する型を表し、Type はメッセージのデータ部分を表します。 priority_buffer クラスは、Type 型のメッセージを送信します。 priority_buffer クラスは、受信メッセージ用の std::priority_queue オブジェクトと送信メッセージ用の std::queue オブジェクトの 2 つのメッセージ キューの管理も行います。 priority_buffer オブジェクトが複数のメッセージを同時に受信する場合、またはコンシューマーがまだメッセージを読み取っていないときに複数のメッセージを受信する場合、メッセージを優先順位に従って並べ替えると便利です。

priority_buffer クラスでは、propagator_block の派生クラスで実装する必要のある 7 つのメソッドに加えて、link_target_notification メソッドと send_message メソッドもオーバーライドします。 priority_buffer クラスでは、2 つのパブリック ヘルパー メソッド (enqueue および dequeue) と 1 つのプライベート ヘルパー メソッド (propagate_priority_order) も定義します。

次の手順では、priority_buffer クラスを実装する方法について説明します。

priority_buffer クラスを定義するには

  1. C++ ヘッダー ファイルを作成し、priority_buffer.h という名前を付けます。 または、プロジェクトに含まれる既存のヘッダー ファイルを使用することもできます。

  2. priority_buffer.h に次のコードを追加します。

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std 名前空間で、std::less および std::greater の特殊化を定義して、Concurrency::message オブジェクトで動作するようにします。

    namespace std 
    {
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
    {  
       typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator< to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) < get<0>(right->payload));
       }
    };
    
    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
    {  
       typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator> to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) > get<0>(right->payload));
       }
    };
    
    }
    

    priority_buffer クラスは、priority_queue オブジェクトに message オブジェクトを格納します。 このような型の特殊化によって、優先順位キューでメッセージが優先順位に従って並べ替えられるようになります。 優先順位は、tuple オブジェクトの最初の要素です。

  4. Concurrency 名前空間で、priority_buffer クラスを宣言します。

    namespace Concurrency 
    {
    template<class Type, 
             typename PriorityType = int,
             typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : 
       public propagator_block<multi_link_registry<ITarget<Type>>,
                               multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
    protected:
    private:
    };
    }
    

    priority_buffer クラスは propagator_block から派生したものです。 したがって、メッセージを送信することも受信することもできます。 priority_buffer クラスは、Type 型のメッセージを受信する複数のターゲットを持つことができます。 さらに、tuple<PriorityType, Type> 型のメッセージを送信する複数のソースを持つことができます。

  5. priority_buffer クラスの private セクションに、次のメンバー変数を追加します。

    // Stores incoming messages. 
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
       message<_Source_type>*, 
       std::vector<message<_Source_type>*>, 
       Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<message<_Target_type>*> _output_messages;
    

    priority_queue オブジェクトは受信メッセージを保持し、queue オブジェクトは送信メッセージを保持します。 priority_buffer オブジェクトは、複数のメッセージを同時に受信できます。critical_section オブジェクトは、入力メッセージのキューへのアクセスを同期します。

  6. private セクションで、コピー コンストラクターと代入演算子を定義します。 これにより、priority_queue オブジェクトが割り当て可能になるのを防ぎます。

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public セクションで、多くのメッセージ ブロックの型に共通のコンストラクターを定義します。 デストラクターも定義します。

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {       
       initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
       initialize_source_and_target();
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler)
    {
       initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler, filter_method const& filter) 
    {
       initialize_source_and_target(&scheduler);
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group)
    {
       initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
    {
       initialize_source_and_target(NULL, &schedule_group);
       register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
       // Remove all links.
       remove_network_links();
    }
    
  8. public セクションで、enqueue メソッドと dequeue メソッドを定義します。 これらのヘルパー メソッドによって、priority_buffer オブジェクトとの間でメッセージを送受信する別の手段が提供されます。

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
      return Concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
      return receive<Type>(this);
    }
    
  9. protected セクションで、propagate_to_any_targets メソッドを定義します。

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(message<_Target_type>*)
    {
       // Retrieve the message from the front of the input queue.
       message<_Source_type>* input_message = NULL;
       {
          critical_section::scoped_lock lock(_input_lock);
          if (_input_messages.size() > 0)
          {
             input_message = _input_messages.top();
             _input_messages.pop();
          }
       }
    
       // Move the message to the output queue.
       if (input_message != NULL)
       {
          // The payload of the output message does not contain the 
          // priority of the message.
          message<_Target_type>* output_message = 
             new message<_Target_type>(get<1>(input_message->payload));
          _output_messages.push(output_message);
    
          // Free the memory for the input message.
          delete input_message;
    
          // Do not propagate messages if the new message is not the head message.
          // In this case, the head message is reserved by another message block.
          if (_output_messages.front()->msg_id() != output_message->msg_id())
          {
             return;
          }
       }
    
       // Propagate out the output messages.
       propagate_priority_order();
    }
    

    propagate_to_any_targets メソッドは、入力キューの先頭にあるメッセージを出力キューに転送し、出力キュー内のすべてのメッセージを伝達します。

  10. protected セクションで、accept_message メソッドを定義します。

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
    {        
       message<_Target_type>* message = NULL;
    
       // Transfer ownership if the provided message identifier matches
       // the identifier of the front of the output message queue.
       if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
       {
          message = _output_messages.front();            
          _output_messages.pop();
       }
    
       return message;
    }
    

    ターゲット ブロックから accept_message メソッドが呼び出されたとき、priority_buffer クラスは、メッセージを最初に受け入れるターゲット ブロックにそのメッセージの所有権を譲渡します (この動作は unbounded_buffer の動作と似ています)。

  11. protected セクションで、reserve_message メソッドを定義します。

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(runtime_object_identity msg_id)
    {
       // Allow the message to be reserved if the provided message identifier
       // is the message identifier of the front of the message queue.
       return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
    }
    

    priority_buffer クラスは、指定されたメッセージ識別子とキューの先頭にあるメッセージの識別子が一致する場合に、ターゲット ブロックでメッセージを予約できるようにします。 つまり、priority_buffer オブジェクトがまだ追加のメッセージを受信しておらず、現在のメッセージも伝達していない場合、ターゲットはメッセージを予約できます。

  12. protected セクションで、consume_message メソッドを定義します。

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual message<Type>* consume_message(runtime_object_identity msg_id)
    {
       // Transfer ownership of the message to the caller.
       return accept_message(msg_id);
    }
    

    ターゲット ブロックは、予約したメッセージの所有権を譲渡するために consume_message を呼び出します。

  13. protected セクションで、release_message メソッドを定義します。

    // Releases a previous message reservation.
    virtual void release_message(runtime_object_identity msg_id)
    {
       // The head message must be the one that is reserved. 
       if (_output_messages.empty() || 
           _output_messages.front()->msg_id() != msg_id)
       {
          throw message_not_found();
       }
    }
    

    ターゲット ブロックは、メッセージの予約を取り消すために release_message を呼び出します。

  14. protected セクションで、resume_propagation メソッドを定義します。

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
       // Propagate out any messages in the output queue.
       if (_output_messages.size() > 0)
       {
          async_send(NULL);
       }
    }
    

    ターゲット ブロックが予約済みのメッセージを処理または解放すると、ランタイムは resume_propagation を呼び出します。 このメソッドは、出力キュー内のすべてのメッセージを伝達します。

  15. protected セクションで、link_target_notification メソッドを定義します。

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(ITarget<_Target_type>*)
    {
       // Do not propagate messages if a target block reserves
       // the message at the front of the queue.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out any messages that are in the output queue.
       propagate_priority_order();
    }
    

    _M_pReservedFor メンバー変数は、source_block 基本クラスによって定義されます。 このメンバー変数は、出力キューの先頭にあるメッセージの予約を保持しているターゲット ブロック (存在する場合) を指します。 新しいターゲットが priority_buffer オブジェクトにリンクされると、ランタイムは link_target_notification を呼び出します。 このメソッドは、ターゲットが予約を保持していない場合に、出力キュー内のすべてのメッセージを伝達します。

  16. private セクションで、propagate_priority_order メソッドを定義します。

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
       // Cancel propagation if another block reserves the head message.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out all output messages. 
       // Because this block preserves message ordering, stop propagation
       // if any of the messages are not accepted by a target block.
       while (!_output_messages.empty())
       {
          // Get the next message.
          message<_Target_type> * message = _output_messages.front();
    
          message_status status = declined;
    
          // Traverse each target in the order in which they are connected.
          for (target_iterator iter = _M_connectedTargets.begin(); 
               *iter != NULL; 
               ++iter)
          {
             // Propagate the message to the target.
             ITarget<_Target_type>* target = *iter;
             status = target->propagate(message, this);
    
             // If the target accepts the message then ownership of message has 
             // changed. Do not propagate this message to any other target.
             if (status == accepted)
             {
                break;
             }
    
             // If the target only reserved this message, we must wait until the 
             // target accepts the message.
             if (_M_pReservedFor != NULL)
             {
                break;
             }
          }
    
          // If status is anything other than accepted, then the head message
          // was not propagated out. To preserve the order in which output 
          // messages are propagated, we must stop propagation until the head 
          // message is accepted.
          if (status != accepted)
          {
              break;
          }          
       }
    }
    

    このメソッドは、出力キュー内のすべてのメッセージを伝達します。 ターゲット ブロックの 1 つがメッセージを受け入れるまで、キュー内の各メッセージが各ターゲット ブロックに提供されます。 priority_buffer クラスは、送信メッセージの順序を保持しています。 そこで、このメソッドでは、出力キュー内の最初のメッセージがいずれかのターゲット ブロックによって受け入れられるまで、他のメッセージをターゲット ブロックに提供しないようにします。

  17. protected セクションで、propagate_message メソッドを定義します。

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual message_status propagate_message(message<_Source_type>* message, 
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Asynchronously send the message to the target blocks.
          async_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    propagate_message メソッドは、priority_buffer クラスがメッセージの受信側 (つまりターゲット) として機能するようにします。 このメソッドは、指定されたソース ブロックから提供されたメッセージを受信し、そのメッセージを優先順位キューに挿入します。 その後、propagate_message メソッドは、すべての出力メッセージをターゲット ブロックに非同期的に送信します。

    ランタイムは、Concurrency::asend 関数が呼び出されたとき、またはメッセージ ブロックが他のメッセージ ブロックに接続されたときに、このメソッドを呼び出します。

  18. protected セクションで、send_message メソッドを定義します。

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual message_status send_message(message<_Source_type>* message,
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Synchronously send the message to the target blocks.
          sync_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    send_message メソッドは propagate_message と似ています。 ただし、このメソッドは、非同期的にではなく同期的に出力メッセージを送信します。

    ランタイムは、Concurrency::send 関数が呼び出された場合など、同期送信操作中にこのメソッドを呼び出します。

priority_buffer クラスには、多くのメッセージ ブロックの型に共通のコンストラクター オーバーロードが含まれています。 一部のコンストラクター オーバーロードは、Concurrency::Scheduler オブジェクトまたは Concurrency::ScheduleGroup オブジェクトを受け取ります。これらのオブジェクトを使用すると、特定のタスク スケジューラでメッセージ ブロックを管理できます。 フィルター関数を受け取るコンストラクター オーバーロードもあります。 フィルター関数を使用すると、メッセージ ブロックでのメッセージの受け入れまたは拒否をメッセージ ペイロードに基づいて行うことができます。 メッセージ フィルターの詳細については、「非同期メッセージ ブロック」を参照してください。 タスク スケジューラの詳細については、「タスク スケジューラ (同時実行ランタイム)」を参照してください。

priority_buffer クラスはメッセージを優先順位に従って並べてから受信順に並べるため、Concurrency::asend 関数を呼び出した場合や、メッセージ ブロックが他のメッセージ ブロックに接続された場合など、メッセージを非同期的に受信する場合に非常に役立ちます。

[ページのトップへ]

完全な例

次の例では、priority_buffer クラスの完全な定義を示します。

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
// A specialization of less that tests whether the priority element of a 
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
{  
   typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator< to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) < get<0>(right->payload));
   }
};

// A specialization of less that tests whether the priority element of a 
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
{  
   typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator> to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) > get<0>(right->payload));
   }
};

}

namespace Concurrency 
{
// A message block type that orders incoming messages first by priority, 
// and then by the order in which messages are received. 
template<class Type, 
         typename PriorityType = int,
         typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : 
   public propagator_block<multi_link_registry<ITarget<Type>>,
                           multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{  
public:
   // Constructs a priority_buffer message block.
   priority_buffer() 
   {       
      initialize_source_and_target();
   }

   // Constructs a priority_buffer message block with the given filter function.
   priority_buffer(filter_method const& filter)
   {
      initialize_source_and_target();
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler)
   {
      initialize_source_and_target(&scheduler);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler, filter_method const& filter) 
   {
      initialize_source_and_target(&scheduler);
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group)
   {
      initialize_source_and_target(NULL, &schedule_group);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
   {
      initialize_source_and_target(NULL, &schedule_group);
      register_filter(filter);
   }

   // Destroys the message block.
   ~priority_buffer()
   {
      // Remove all links.
      remove_network_links();
   }

   // Sends an item to the message block.
   bool enqueue(Type const& item)
   {
     return Concurrency::asend<Type>(this, item);
   }

   // Receives an item from the message block.
   Type dequeue()
   {
     return receive<Type>(this);
   }

protected:
   // Asynchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::propagate.
   virtual message_status propagate_message(message<_Source_type>* message, 
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Asynchronously send the message to the target blocks.
         async_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Synchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::send.
   virtual message_status send_message(message<_Source_type>* message,
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Synchronously send the message to the target blocks.
         sync_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Accepts a message that was offered by this block by transferring ownership
   // to the caller.
   virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
   {        
      message<_Target_type>* message = NULL;

      // Transfer ownership if the provided message identifier matches
      // the identifier of the front of the output message queue.
      if (!_output_messages.empty() && 
           _output_messages.front()->msg_id() == msg_id)
      {
         message = _output_messages.front();            
         _output_messages.pop();
      }

      return message;
   }

   // Reserves a message that was previously offered by this block.
   virtual bool reserve_message(runtime_object_identity msg_id)
   {
      // Allow the message to be reserved if the provided message identifier
      // is the message identifier of the front of the message queue.
      return (!_output_messages.empty() && 
               _output_messages.front()->msg_id() == msg_id);
   }

   // Transfers the message that was previously offered by this block 
   // to the caller. The caller of this method is the target block that 
   // reserved the message.
   virtual message<Type>* consume_message(runtime_object_identity msg_id)
   {
      // Transfer ownership of the message to the caller.
      return accept_message(msg_id);
   }

   // Releases a previous message reservation.
   virtual void release_message(runtime_object_identity msg_id)
   {
      // The head message must be the one that is reserved. 
      if (_output_messages.empty() || 
          _output_messages.front()->msg_id() != msg_id)
      {
         throw message_not_found();
      }
   }

   // Resumes propagation after a reservation has been released.
   virtual void resume_propagation()
   {
      // Propagate out any messages in the output queue.
      if (_output_messages.size() > 0)
      {
         async_send(NULL);
      }
   }

   // Notifies this block that a new target has been linked to it.
   virtual void link_target_notification(ITarget<_Target_type>*)
   {
      // Do not propagate messages if a target block reserves
      // the message at the front of the queue.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out any messages that are in the output queue.
      propagate_priority_order();
   }

   // Transfers the message at the front of the input queue to the output queue
   // and propagates out all messages in the output queue.
   virtual void propagate_to_any_targets(message<_Target_type>*)
   {
      // Retrieve the message from the front of the input queue.
      message<_Source_type>* input_message = NULL;
      {
         critical_section::scoped_lock lock(_input_lock);
         if (_input_messages.size() > 0)
         {
            input_message = _input_messages.top();
            _input_messages.pop();
         }
      }

      // Move the message to the output queue.
      if (input_message != NULL)
      {
         // The payload of the output message does not contain the 
         // priority of the message.
         message<_Target_type>* output_message = 
            new message<_Target_type>(get<1>(input_message->payload));
         _output_messages.push(output_message);

         // Free the memory for the input message.
         delete input_message;

         // Do not propagate messages if the new message is not the head message.
         // In this case, the head message is reserved by another message block.
         if (_output_messages.front()->msg_id() != output_message->msg_id())
         {
            return;
         }
      }

      // Propagate out the output messages.
      propagate_priority_order();
   }

private:

   // Propagates messages in priority order.
   void propagate_priority_order()
   {
      // Cancel propagation if another block reserves the head message.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out all output messages. 
      // Because this block preserves message ordering, stop propagation
      // if any of the messages are not accepted by a target block.
      while (!_output_messages.empty())
      {
         // Get the next message.
         message<_Target_type> * message = _output_messages.front();

         message_status status = declined;

         // Traverse each target in the order in which they are connected.
         for (target_iterator iter = _M_connectedTargets.begin(); 
              *iter != NULL; 
              ++iter)
         {
            // Propagate the message to the target.
            ITarget<_Target_type>* target = *iter;
            status = target->propagate(message, this);

            // If the target accepts the message then ownership of message has 
            // changed. Do not propagate this message to any other target.
            if (status == accepted)
            {
               break;
            }

            // If the target only reserved this message, we must wait until the 
            // target accepts the message.
            if (_M_pReservedFor != NULL)
            {
               break;
            }
         }

         // If status is anything other than accepted, then the head message
         // was not propagated out. To preserve the order in which output 
         // messages are propagated, we must stop propagation until the head 
         // message is accepted.
         if (status != accepted)
         {
             break;
         }          
      }
   }

private:

   // Stores incoming messages. 
   // The type parameter Pr specifies how to order messages by priority.
   std::priority_queue<
      message<_Source_type>*, 
      std::vector<message<_Source_type>*>, 
      Pr
   > _input_messages;

   // Synchronizes access to the input message queue.
   critical_section _input_lock;

   // Stores outgoing messages.
   std::queue<message<_Target_type>*> _output_messages;

private:
   // Hide assignment operator and copy constructor.
   priority_buffer const &operator =(priority_buffer const&);
   priority_buffer(priority_buffer const &);
};

}

次の例では、priority_buffer オブジェクトに対して多くの asend 操作と Concurrency::receive 操作を同時に実行します。

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

この例では、次のサンプル出力が生成されます。

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

priority_buffer クラスは、メッセージを優先順位に従って並べてから、メッセージの受信順に並べます。 この例では、優先順位を示す数値が大きいメッセージほど、キューの先頭近くに挿入されています。

[ページのトップへ]

コードのコンパイル

プログラム例をコピーし、Visual Studio プロジェクトに貼り付けるか、priority_buffer クラスの定義を priority_buffer.h という名前のファイルに、テスト プログラムを priority_buffer.cpp という名前のファイルにそれぞれ貼り付け、Visual Studio 2010 のコマンド プロンプト ウィンドウで次のコマンドを実行します。

cl.exe /EHsc priority_buffer.cpp

参照

概念

非同期メッセージ ブロック

メッセージ パッシング関数

その他の技術情報

同時実行ランタイムのチュートリアル