Walkthrough: Creating an Agent-Based Application

This topic describes how to create a basic agent-based application. In this walkthrough, you can create an agent that reads data from a text file asynchronously. The application uses the Adler-32 checksum algorithm to calculate the checksum of the contents of that file.

This section shows how to create a Visual C++ console application that references the header files that the program will use.

To create a Visual C++ application by using the Win32 Console Application Wizard

  1. On the File menu, click New, and then click Project to display the New Project dialog box.

  2. In the New Project dialog box, select the Visual C++ node in the Project types pane and then select Win32 Console Application in the Templates pane. Type a name for the project, for example, BasicAgent, and then click OK to display the Win32 Console Application Wizard.

  3. In the Win32 Console Application Wizard dialog box, click Finish.

  4. In stdafx.h, add the following code.

    
    #include <agents.h>
    #include <string>
    #include <iostream>
    #include <algorithm>
    
    
    

    The header file agents.h contains the functionality of the Concurrency::agent class.

  5. Verify that the application was created successfully by building and running it. To build the application, on the Build menu, click Build Solution. If the application builds successfully, run the application by clicking Start Debugging on the Debug menu.

[go to top]

This section shows how to create the file_reader class. The runtime schedules each agent to perform work in its own context. Therefore, you can create an agent that performs work synchronously, but interacts with other components asynchronously. The file_reader class reads data from a given input file and sends data from that file to a given target component.

To create the file_reader class

  1. Add a new C++ header file to your project. To do so, right-click the Header Files node in Solution Explorer, click Add, and then click New Item. In the Templates pane, select Header File (.h). In the Add New Item dialog box, type file_reader.h in the Name box and then click Add.

  2. In file_reader.h, add the following code.

    
    #pragma once
    
    
    
  3. In file_reader.h, create a class that is named file_reader that derives from agent.

    
    class file_reader : public Concurrency::agent
    {
    public:
    protected:
    private:
    };
    
    
    
  4. Add the following data members to the private section of your class.

    
    std::string _file_name;
    Concurrency::ITarget<std::string>& _target;
    Concurrency::overwrite_buffer<std::exception> _error;
    
    
    

    The _file_name member is the file name that the agent reads from. The _target member is a Concurrency::ITarget object that the agent writes the contents of the file to. The _error member holds any error that occurs during the life of the agent.

  5. Add the following code for the file_reader constructors to the public section of the file_reader class.

    
    explicit file_reader(const std::string& file_name, 
       Concurrency::ITarget<std::string>& target)
       : _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       Concurrency::ITarget<std::string>& target,
       Concurrency::Scheduler& scheduler)
       : agent(scheduler)
       , _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       Concurrency::ITarget<std::string>& target,
       Concurrency::ScheduleGroup& group)
       : agent(group) 
       , _file_name(file_name)
       , _target(target)
    {
    }
    
    
    

    Each constructor overload sets the file_reader data members. The second and third constructor overload enables your application to use a specific scheduler with your agent. The first overload uses the default scheduler with your agent.

  6. Add the get_error method to the public section of the file_reader class.

    
    bool get_error(std::exception& e)
    {
       return try_receive(_error, e);
    }
    
    
    

    The get_error method retrieves any error that occurs during the life of the agent.

  7. Implement the Concurrency::agent::run method in the protected section of your class.

    
    void run()
    {
       FILE* stream;
       try
       {
          // Open the file.
          if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
          {
             // Throw an exception if an error occurs.            
             throw std::exception("Failed to open input file.");
          }
    
          // Create a buffer to hold file data.
          char buf[1024];
    
          // Set the buffer size.
          setvbuf(stream, buf, _IOFBF, sizeof buf);
    
          // Read the contents of the file and send the contents
          // to the target.
          while (fgets(buf, sizeof buf, stream))
          {
             asend(_target, std::string(buf));
          }   
    
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Close the file.
          fclose(stream);
       }
       catch (const std::exception& e)
       {
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Write the exception to the error buffer.
          send(_error, e);
       }
    
       // Set the status of the agent to agent_done.
       done();
    }
    
    
    

    The run method opens the file and reads data from it. The run method uses exception handling to capture any errors that occur during file processing.

    Each time this method reads data from the file, it calls the Concurrency::asend function to send that data to the target buffer. It sends the empty string to its target buffer to indicate the end of processing.

The following example shows the complete contents of file_reader.h.


#pragma once

class file_reader : public Concurrency::agent
{
public:
   explicit file_reader(const std::string& file_name, 
      Concurrency::ITarget<std::string>& target)
      : _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      Concurrency::ITarget<std::string>& target,
      Concurrency::Scheduler& scheduler)
      : agent(scheduler)
      , _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      Concurrency::ITarget<std::string>& target,
      Concurrency::ScheduleGroup& group)
      : agent(group) 
      , _file_name(file_name)
      , _target(target)
   {
   }

   // Retrieves any error that occurs during the life of the agent.
   bool get_error(std::exception& e)
   {
      return try_receive(_error, e);
   }

protected:
   void run()
   {
      FILE* stream;
      try
      {
         // Open the file.
         if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
         {
            // Throw an exception if an error occurs.            
            throw std::exception("Failed to open input file.");
         }

         // Create a buffer to hold file data.
         char buf[1024];

         // Set the buffer size.
         setvbuf(stream, buf, _IOFBF, sizeof buf);

         // Read the contents of the file and send the contents
         // to the target.
         while (fgets(buf, sizeof buf, stream))
         {
            asend(_target, std::string(buf));
         }   

         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Close the file.
         fclose(stream);
      }
      catch (const std::exception& e)
      {
         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Write the exception to the error buffer.
         send(_error, e);
      }

      // Set the status of the agent to agent_done.
      done();
   }

private:
   std::string _file_name;
   Concurrency::ITarget<std::string>& _target;
   Concurrency::overwrite_buffer<std::exception> _error;
};


[go to top]

This section shows how to use the file_reader class to read the contents of a text file. It also shows how to create a Concurrency::call object that receives this file data and calculates its Adler-32 checksum.

To use the file_reader class in your application

  1. In BasicAgent.cpp, add the following #include statement.

    
    #include "file_reader.h"
    
    
    
  2. In BasicAgent.cpp, add the following using directives.

    
    using namespace Concurrency;
    using namespace std;
    
    
    
  3. In the _tmain function, create a Concurrency::event object that signals the end of processing.

    
    event e;
    
    
    
  4. Create a call object that updates the checksum when it receives data.

    
    // The components of the Adler-32 sum.
    unsigned int a = 1;
    unsigned int b = 0;
    
    // A call object that updates the checksum when it receives data.
    call<string> calculate_checksum([&] (string s) {
       // If the input string is empty, set the event to signal
       // the end of processing.
       if (s.size() == 0)
          e.set();
       // Perform the Adler-32 checksum algorithm.
       for_each(s.begin(), s.end(), [&] (char c) {
          a = (a + c) % 65521;
          b = (b + a) % 65521;
       });
    });
    
    
    

    This call object also sets the event object when it receives the empty string to signal the end of processing.

  5. Create a file_reader object that reads from the file test.txt and writes the contents of that file to the call object.

    
    file_reader reader("test.txt", calculate_checksum);
    
    
    
  6. Start the agent and wait for it to finish.

    
    reader.start();
    agent::wait(&reader);
    
    
    
  7. Wait for the call object to receive all data and finish.

    
    e.wait();
    
    
    
  8. Check the file reader for errors. If no error occurred, calculate the final Adler-32 sum and print the sum to the console.

    
    std::exception error;
    if (reader.get_error(error))
    {
       wcout << error.what() << endl;
    }   
    else
    {      
       unsigned int adler32_sum = (b << 16) | a;
       wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
    }
    
    
    

The following example shows the complete BasicAgent.cpp file.


// BasicAgent.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "file_reader.h"

using namespace Concurrency;
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
   // An event object that signals the end of processing.
   event e;

   // The components of the Adler-32 sum.
   unsigned int a = 1;
   unsigned int b = 0;

   // A call object that updates the checksum when it receives data.
   call<string> calculate_checksum([&] (string s) {
      // If the input string is empty, set the event to signal
      // the end of processing.
      if (s.size() == 0)
         e.set();
      // Perform the Adler-32 checksum algorithm.
      for_each(s.begin(), s.end(), [&] (char c) {
         a = (a + c) % 65521;
         b = (b + a) % 65521;
      });
   });

   // Create the agent.
   file_reader reader("test.txt", calculate_checksum);

   // Start the agent and wait for it to complete.
   reader.start();
   agent::wait(&reader);

   // Wait for the call object to receive all data and complete.
   e.wait();

   // Check the file reader for errors.
   // If no error occurred, calculate the final Adler-32 sum and print it 
   // to the console.
   std::exception error;
   if (reader.get_error(error))
   {
      wcout << error.what() << endl;
   }   
   else
   {      
      unsigned int adler32_sum = (b << 16) | a;
      wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
   }
}


[go to top]

This is the sample contents of the input file text.txt:

The quick brown fox
jumps
over the lazy dog

When used with the sample input, this program produces the following output:

Adler-32 sum is fefb0d75

To prevent concurrent access to data members, we recommend that you add methods that perform work to the protected or private section of your class. Only add methods that send or receive messages to or from the agent to the public section of your class.

Always call the Concurrency::agent::done method to move your agent to the completed state. You typically call this method before you return from the run method.

For another example of an agent-based application, see Walkthrough: Using join to Prevent Deadlock.

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback

Community Additions

ADD
Show:
© 2014 Microsoft