チュートリアル: エージェント ベースのアプリケーションの作成

ここでは、基本的なエージェント ベースのアプリケーションの作成方法について説明します。 このチュートリアルでは、テキスト ファイルから非同期的にデータを読み取るエージェントを作成できます。 このアプリケーションでは、Adler-32 チェックサム アルゴリズムを使用して、そのファイルの内容のチェックサムを計算します。

必須コンポーネント

このチュートリアルを完了するには、次のトピックを理解する必要があります。

セクション

このチュートリアルでは、次のタスクを実行する方法を示します。

  • コンソール アプリケーションの作成

  • file_reader クラスの作成

  • アプリケーションでの file_reader クラスの使用

コンソール アプリケーションの作成

ここでは、プログラムで使用されるヘッダー ファイルを参照する Visual C++ コンソール アプリケーションの作成方法について説明します。

Win32 コンソール アプリケーション ウィザードを使用して Visual C++ アプリケーションを作成するには

  1. [ファイル] メニューの [新規作成] をクリックし、[プロジェクト] をクリックして [新しいプロジェクト] ダイアログ ボックスを表示します。

  2. [新しいプロジェクト] ダイアログ ボックスで、[プロジェクトの種類] ペインの [Visual C++] ノードをクリックし、[テンプレート] ペインの [Win32 コンソール アプリケーション] をクリックします。 プロジェクトの名前 (BasicAgent など) を入力し、[OK] をクリックして、Win32 コンソール アプリケーション ウィザードを表示します。

  3. [Win32 コンソール アプリケーション ウィザード] ダイアログ ボックスで、[完了] をクリックします。

  4. stdafx.h に、次のコードを追加します。

    #include <agents.h>
    #include <string>
    #include <iostream>
    #include <algorithm>
    

    agents.h ヘッダー ファイルには、Concurrency::agent クラスの機能が含まれています。

  5. アプリケーションをビルドして実行することにより、アプリケーションが正常に作成されたことを確認します。 ソリューションをビルドするには、[ビルド] メニューの [ソリューションのビルド] をクリックします。 アプリケーションが正常にビルドされたら、[デバッグ] メニューの [デバッグ開始] をクリックして、アプリケーションを実行します。

[ページのトップへ]

file_reader クラスの作成

ここでは、file_reader クラスの作成方法について説明します。 ランタイムは、各エージェントがそれぞれのコンテキストで処理を実行するようにスケジュールを設定します。 そのため、処理を同期的に実行する一方で、他のコンポーネントとは非同期的に通信するエージェントを作成できます。 file_reader クラスでは、指定された入力ファイルからデータを読み取り、そのファイルのデータを指定されたターゲット コンポーネントに送信します。

file_reader クラスを作成するには

  1. 新しい C++ ヘッダー ファイルをプロジェクトに追加します。 これを行うには、ソリューション エクスプローラー[ヘッダー ファイル] ノードを右クリックし、[追加] をクリックして、[新しい項目] をクリックします。 [テンプレート] ペインの [ヘッダー ファイル (.h)] をクリックします。 [新しい項目の追加] ダイアログ ボックスの [ファイル名] ボックスに「file_reader.h」と入力し、[追加] をクリックします。

  2. file_reader.h に、次のコードを追加します。

    #pragma once
    
  3. file_reader.h に、agent から派生する file_reader という名前のクラスを作成します。

    class file_reader : public Concurrency::agent
    {
    public:
    protected:
    private:
    };
    
  4. クラスの private セクションに次のデータ メンバーを追加します。

    std::string _file_name;
    Concurrency::ITarget<std::string>& _target;
    Concurrency::overwrite_buffer<std::exception> _error;
    

    _file_name メンバーは、エージェントが読み取る対象のファイル名です。 _target メンバーは、エージェントがファイルの内容を書き込む Concurrency::ITarget オブジェクトです。 _error メンバーでは、エージェントの有効期間中に発生したエラーを保持します。

  5. file_reader クラスの public セクションに file_reader コンストラクターの次のコードを追加します。

    explicit file_reader(const std::string& file_name, 
       Concurrency::ITarget<std::string>& target)
       : _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       Concurrency::ITarget<std::string>& target,
       Concurrency::Scheduler& scheduler)
       : agent(scheduler)
       , _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       Concurrency::ITarget<std::string>& target,
       Concurrency::ScheduleGroup& group)
       : agent(group) 
       , _file_name(file_name)
       , _target(target)
    {
    }
    

    各コンストラクター オーバーロードによって、file_reader データ メンバーが設定されます。 2 番目と 3 番目のコンストラクター オーバーロードによって、アプリケーションでエージェントに対して特定のスケジューラを使用できるようにします。 最初のオーバーロードでは、エージェントに対して既定のスケジューラを使用します。

  6. file_reader クラスのパブリック セクションに get_error メソッドを追加します。

    bool get_error(std::exception& e)
    {
       return try_receive(_error, e);
    }
    

    get_error メソッドにより、エージェントの有効期間中に発生したエラーを取得します。

  7. クラスの protected セクションで、Concurrency::agent::run メソッドを実装します。

    void run()
    {
       FILE* stream;
       try
       {
          // Open the file.
          if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
          {
             // Throw an exception if an error occurs.            
             throw std::exception("Failed to open input file.");
          }
    
          // Create a buffer to hold file data.
          char buf[1024];
    
          // Set the buffer size.
          setvbuf(stream, buf, _IOFBF, sizeof buf);
    
          // Read the contents of the file and send the contents
          // to the target.
          while (fgets(buf, sizeof buf, stream))
          {
             asend(_target, std::string(buf));
          }   
    
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Close the file.
          fclose(stream);
       }
       catch (const std::exception& e)
       {
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Write the exception to the error buffer.
          send(_error, e);
       }
    
       // Set the status of the agent to agent_done.
       done();
    }
    

    run メソッドによりファイルを開き、そこからデータを読み取ります。 run メソッドでは、例外処理を使用して、ファイルの処理中に発生したエラーをキャプチャします。

    このメソッドでは、ファイルからデータを読み取るたびに Concurrency::asend 関数を呼び出し、そのデータをターゲット バッファーに送信します。 処理の終了を示す際には、空の文字列をターゲット バッファーに送信します。

file_reader.h の内容全体の例を次に示します。

#pragma once

class file_reader : public Concurrency::agent
{
public:
   explicit file_reader(const std::string& file_name, 
      Concurrency::ITarget<std::string>& target)
      : _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      Concurrency::ITarget<std::string>& target,
      Concurrency::Scheduler& scheduler)
      : agent(scheduler)
      , _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      Concurrency::ITarget<std::string>& target,
      Concurrency::ScheduleGroup& group)
      : agent(group) 
      , _file_name(file_name)
      , _target(target)
   {
   }

   // Retrieves any error that occurs during the life of the agent.
   bool get_error(std::exception& e)
   {
      return try_receive(_error, e);
   }

protected:
   void run()
   {
      FILE* stream;
      try
      {
         // Open the file.
         if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
         {
            // Throw an exception if an error occurs.            
            throw std::exception("Failed to open input file.");
         }

         // Create a buffer to hold file data.
         char buf[1024];

         // Set the buffer size.
         setvbuf(stream, buf, _IOFBF, sizeof buf);

         // Read the contents of the file and send the contents
         // to the target.
         while (fgets(buf, sizeof buf, stream))
         {
            asend(_target, std::string(buf));
         }   

         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Close the file.
         fclose(stream);
      }
      catch (const std::exception& e)
      {
         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Write the exception to the error buffer.
         send(_error, e);
      }

      // Set the status of the agent to agent_done.
      done();
   }

private:
   std::string _file_name;
   Concurrency::ITarget<std::string>& _target;
   Concurrency::overwrite_buffer<std::exception> _error;
};

[ページのトップへ]

アプリケーションでの file_reader クラスの使用

ここでは、file_reader クラスを使用して、テキスト ファイルの内容を読み取る方法について説明します。 また、このファイル データを受け取り、その Adler-32 チェックサムを計算する Concurrency::call オブジェクトの作成方法についても説明します。

アプリケーションで file_reader クラスを使用するには

  1. BasicAgent.cpp に、次の #include ステートメントを追加します。

    #include "file_reader.h"
    
  2. BasicAgent.cpp に、次の using ディレクティブを追加します。

    using namespace Concurrency;
    using namespace std;
    
  3. _tmain 関数に、処理の終了を通知する Concurrency::event オブジェクトを作成します。

    event e;
    
  4. データを受け取ったときにチェックサムを更新する call オブジェクトを作成します。

    // The components of the Adler-32 sum.
    unsigned int a = 1;
    unsigned int b = 0;
    
    // A call object that updates the checksum when it receives data.
    call<string> calculate_checksum([&] (string s) {
       // If the input string is empty, set the event to signal
       // the end of processing.
       if (s.size() == 0)
          e.set();
       // Perform the Adler-32 checksum algorithm.
       for_each(s.begin(), s.end(), [&] (char c) {
          a = (a + c) % 65521;
          b = (b + a) % 65521;
       });
    });
    

    また、この call オブジェクトは、処理の終了を通知する空の文字列を受け取ると、event オブジェクトを設定します。

  5. test.txt ファイルから読み取り、そのファイルの内容を call オブジェクトに書き込む file_reader オブジェクトを作成します。

    file_reader reader("test.txt", calculate_checksum);
    
  6. エージェントを開始し、エージェントが終了するまで待機します。

    reader.start();
    agent::wait(&reader);
    
  7. call オブジェクトがすべてのデータを受け取り、終了するまで待機します。

    e.wait();
    
  8. エラーが発生していないかどうかファイル リーダーを確認します。 エラーが発生していない場合は、最終的な Adler-32 チェックサムを計算し、その結果をコンソールに出力します。

    std::exception error;
    if (reader.get_error(error))
    {
       wcout << error.what() << endl;
    }   
    else
    {      
       unsigned int adler32_sum = (b << 16) | a;
       wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
    }
    

完全な BasicAgent.cpp ファイルの例を次に示します。

// BasicAgent.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "file_reader.h"

using namespace Concurrency;
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
   // An event object that signals the end of processing.
   event e;

   // The components of the Adler-32 sum.
   unsigned int a = 1;
   unsigned int b = 0;

   // A call object that updates the checksum when it receives data.
   call<string> calculate_checksum([&] (string s) {
      // If the input string is empty, set the event to signal
      // the end of processing.
      if (s.size() == 0)
         e.set();
      // Perform the Adler-32 checksum algorithm.
      for_each(s.begin(), s.end(), [&] (char c) {
         a = (a + c) % 65521;
         b = (b + a) % 65521;
      });
   });

   // Create the agent.
   file_reader reader("test.txt", calculate_checksum);

   // Start the agent and wait for it to complete.
   reader.start();
   agent::wait(&reader);

   // Wait for the call object to receive all data and complete.
   e.wait();

   // Check the file reader for errors.
   // If no error occurred, calculate the final Adler-32 sum and print it 
   // to the console.
   std::exception error;
   if (reader.get_error(error))
   {
      wcout << error.what() << endl;
   }   
   else
   {      
      unsigned int adler32_sum = (b << 16) | a;
      wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
   }
}

[ページのトップへ]

入力のサンプル

これは、入力ファイル text.txt の内容のサンプルです。

The quick brown fox
jumps
over the lazy dog

出力例

入力のサンプルを使用すると、このプログラムでは、次の出力が生成されます。

Adler-32 sum is fefb0d75

信頼性の高いプログラミング

データ メンバーへの同時アクセスを回避するために、クラスの protected セクションまたは private セクションに、処理を実行するメソッドを追加することをお勧めします。 クラスの public セクションには、メッセージをエージェントに送信するメソッドまたはメッセージをエージェントから受信するメソッドのみを追加してください。

必ず、Concurrency::agent::done メソッドを呼び出して、エージェントを完了の状態に移行してください。 通常、このメソッドは、run メソッドから制御が戻る前に呼び出します。

次の手順

エージェント ベースのアプリケーションの別の例については、「チュートリアル: join を使用したデッドロックの防止」を参照してください。

参照

概念

非同期エージェント ライブラリ

非同期メッセージ ブロック

メッセージ パッシング関数

同期データ構造

その他の技術情報

チュートリアル: join を使用したデッドロックの防止