Freigeben über


Empfohlene Vorgehensweisen in der Asynchronous Agents Library

In diesem Dokument wird die effektive Verwendung der Asynchronous Agents Library beschrieben. Die Agents Library begünstigt ein akteurbasiertes Programmiermodell und die prozessinterne Nachrichtenübergabe für simple Datenfluss- und Pipelinesaufgaben.

Weitere Informationen zur Agents Library finden Sie unter Asynchronous Agents Library.

Abschnitte

Dieses Dokument enthält folgende Abschnitte:

  • Isolieren von Status mit Agents

  • Begrenzen der Anzahl von Nachrichten in einer Datenpipeline mit einem Einschränkungsmechanismus

  • Keine detaillierte Arbeit in einer Datenpipeline

  • Keine Übergabe von großer Nutzlasten für Nachrichten anhand des Werts

  • Verwenden von shared_ptr in einem Datennetzwekr bei unklarem Besitzer

Isolieren von Status mit Agents

Die Agents Library stellt Alternativen zum Freigabezustand bereit, indem sie das Verbinden isolierter Komponenten durch einen asynchronen Nachrichtenübergabemechanismum ermöglicht. Asynchrone Agents sind am effektivsten, wenn der interne Zustand von anderen Komponenten isoliert werden kann. Aufgrund der Isolierung des Status wirken mehrere Komponenten i. d. R. nicht auf freigegebene Daten. Die Isolierung des Status ermöglicht eine Skalierung der Anwendung, da Konflikte im freigegebenen Speicher reduziert werden. Ferner wird dadurch die Wahrscheinlichkeit von Deadlocks und Racebedingungen verringert, da der Zugriff auf die freigegebenen Daten nicht zwischen den Komponenten synchronisiert werden muss.

Status in einem Agent werden i. d. R. isoliert, indem Datenmember im Abschnitt private oder im Abschnitt protected der Agent-Klasse platziert und Statusänderungen mit Nachrichtenpuffern kommuniziert werden. Im folgenden Beispiel wird die basic_agent-Klasse veranschaulicht die von Concurrency::agent abgeleitet wird. Die basic_agent-Klasse verwendet zwei Meldungspuffer zur Kommunikation mit externen Komponenten. Ein Nachrichtenpuffer enthält eingehende Meldungen, der andere Nachrichtenpuffer enthält ausgehende Nachrichten.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public Concurrency::agent
{
public:
   basic_agent(Concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   Concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = Concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         Concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   Concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   Concurrency::unbounded_buffer<int> _output;
};

Vollständige Beispiele zum Definieren und Verwenden von Agents finden Sie unter Exemplarische Vorgehensweise: Erstellen einer agentbasierten Anwendung und Exemplarische Vorgehensweise: Erstellen eines Datenfluss-Agent.

[Nach oben]

Begrenzen der Anzahl von Nachrichten in einer Datenpipeline mit einem Einschränkungsmechanismus

Viele Nachrichtenpuffertypen wie Concurrency::unbounded_buffer können eine unbegrenzte Anzahl an Nachrichten enthalten. Wenn Nachrichten vom Nachrichtenproducer schneller an eine Datenpipeline gesendet werden, als diese vom Consumer verarbeitet werden können, kann der Speicherstatus der Anwendung auf Speichermangel hinweisen. Mit einem Einschränkungsmechanismus wie einem Semaphor können Sie die Anzahl der Nachrichten begrenzen, die zu einem gegebenen Zeitpunkt gleichzeitg in einer Datenpipepline aktiv sind.

Im Folgenden einfachen Beispiel wird veranschaulicht, wie die Anzahl der Nachrichten in einer Datenpipeline mit einem Semaphor eingeschränkt wird. Die Datenpipeline simuliert mit der Concurrency::wait-Funktion einen Vorgang, der mindestens 100 Milliseconds dauert. Da die Nachrichten vom Absender schneller erzeugt werden, als diese vom Consumer verarbeitet werden können, wird in diesem Beispiel die semaphore-Klasse definiert, sodass die Anzahl der aktiven Nachrichten von der Anwendung eingeschränkt werden kann.

// message-throttling.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(LONG capacity);

   // Acquires access to the semaphore.
   void acquire();

   // Releases access to the semaphore.
   void release();

private:
   // The semaphore count.
   LONG _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L);

   // Decrements the event counter.
   void signal();

   // Increments the event counter.
   void add_count();

   // Blocks the current context until the event is set.
   void wait();

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};


int wmain()
{
   // The number of messages to send to the consumer.
   const int MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(int i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();   
}

//
// semaphore class implementation.
//

semaphore::semaphore(LONG capacity)
   : _semaphore_count(capacity)
{
}

// Acquires access to the semaphore.
void semaphore::acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (InterlockedDecrement(&_semaphore_count) < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}

// Releases access to the semaphore.
void semaphore::release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (InterlockedIncrement(&_semaphore_count) <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      if (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

//
// countdown_event class implementation.
//

countdown_event::countdown_event(unsigned int count)
   : _current(static_cast<long>(count)) 
{
   // Set the event if the initial count is zero.
   if (_current == 0L)
      _event.set();
}

// Decrements the event counter.
void countdown_event::signal() {
   if(InterlockedDecrement(&_current) == 0L) {
      _event.set();
   }
}

// Increments the event counter.
void countdown_event::add_count() {
   if(InterlockedIncrement(&_current) == 1L) {
      _event.reset();
   }
}

// Blocks the current context until the event is set.
void countdown_event::wait() {
   _event.wait();
}

Dieses Beispiel erzeugt die folgende Beispielausgabe:

0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4

Die Pipeline wird vom semaphore-Objekt eingeschränkt, sodass maximal zwei Nachrichten gleichzeitig verarbeitet werden können.

Der Producer in diesem Beispiel sendet verhältnismäßig wenig Nachrichten an den Consumer. Daher ist dieses Beispiel ungeeignet, um eine Situation mit einem Mangel an Arbeitsspeicher zu veranschaulichen. Dieser Mechanismus ist jedoch hilfreich, wenn eine Datenpipeline eine relativ hohe Anzahl von Meldungen enthält.

Weitere Informationen zum Erstellen der Semaphorenklasse für dieses Beispiel finden Sie unter Gewusst wie: Implementieren einer kooperativen Semaphore mithilfe der Context-Klasse.

[Nach oben]

Keine detaillierte Arbeit in einer Datenpipeline

Die Agents Library ist besonders hilfreich, wenn die Arbeit, die von einer Datenpipeline ausgeführt wird, eher simpel ist. Beispielsweise könnte eine Anwendungskomponente Daten aus einer Datei oder einer Netzwerkverbindung lesen und ab und zu Daten an eine andere Komponente senden. Aufgrund des von der Agents Library verwendeten Protokolls zur Weitergabe von Nachrichten steigt der Aufwand beim Nachrichtenübergabemechanismus gegenüber den parallelen Konstrukten der Aufgabe, die von der Parallel Patterns Library (PPL) bereitgestellt werden. Deshalb müssen Sie sicherstellen, dass die Arbeit, die von einer Datenpipeline ausgeführt wird, lang genug dauert, um diesen Mehraufwand auszugleichen.

Obwohl eine Datenpipeline am effektivsten ist, wenn die Aufgaben simpel sind, können in jeder Phase der Datenpipeline PPL-Konstrukte wie Aufgabengruppen und parallele Algorithmen verwendet werden, um differenziertere Aufgaben auszuführen. Ein Beispiel für ein simple Datennetzwerk mit detaillierter Parallelität in den einzelnen Verarbeitungsstufen finden Sie unter Exemplarische Vorgehensweise: Erstellen eines Bildverarbeitungsnetzwerks.

[Nach oben]

Keine Übergabe von großer Nutzlasten für Nachrichten anhand des Werts

In einigen Fällen wird von der Laufzeit eine Kopie einer Nachricht erstellt und von einem Nachrichtenpuffer an einen anderen Nachrichtenpuffer übergeben. Beispielsweise wird von der Concurrency::overwrite_buffer-Klasse eine Kopie aller empfangenen Nachrichten für die einzelnen Ziele bereitgestellt. Darüber hinaus wird von der Laufzeit eine Kopie der Nachrichtendaten erstellt, wenn Sie Nachrichtenübergabefunktionen wie Concurrency::send und Concurrency::receive verwenden, um Nachrichten in einen Nachrichtenpuffer zu schreiben oder aus einem Nachrichtenpuffer zu lesen. Auch wenn dieser Mechanismus dazu beiträgt, das Risiko zu verringern, dass freigegebene Daten gleichzeitig geschrieben werden, kann die Arbeitsspeicherleistung darunter leiden, dass die Nachrichten relativ hoch ist.

Sie können die Arbeitsspeicherleistung beim Übergeben von Nachrichten mit einer großen Nutzlast verbessern, indem Sie Zeiger und Verweise verwenden. Im folgenden Beispiel wird die Übergabe umfangreicher Nachrichten anhand des Werts mit der Übergabe von Zeigern auf den gleichen Nachrichtentyp verglichen. Im Beispiel werden zwei Agent-Typen definiert, producer und consumer, die auf message_data-Objekte angewendet werden. In dem Beispiel wird die Zeit, die der Producer benötigt, um mehrere message_data-Objekte an den Consumer zu senden, mit der Zeit verglichen, die der Producer-Agent benötigt, um mehrere Zeiger auf message_data-Objekte an den Consumer zu senden.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Dieses Beispiel erzeugt die folgende Beispielausgabe:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Die Version mit Zeiger weist eine bessere Leistungauf, da von der Laufzeit während der Übergabe vom Producer an den Consumer nicht eine vollständige Kopie jedes message_data-Objekts erstellt werden muss.

[Nach oben]

Verwenden von shared_ptr in einem Datennetzwekr bei unklarem Besitzer

Beim Senden von Nachrichten anhand des Zeigers über eine Pipeline oder ein Netzwerk zur Übergabe von Nachrichten wird der Arbeitsspeicher für jede Nachricht i. d. R. am Anfang des Netzwerks zugewiesen und am Ende des Netzwerks freigegeben. Obwohl dieser Mechanismus normalerweise gut funktioniert, kann er jedoch in einigen Fällen kaum oder nicht verwendet werden. Betrachten Sie beispielsweise den Fall eines Datennetzwerkes mit mehreren Endknoten. In diesem Fall gibt es keine klare Position, um den Arbeitsspeicher für die Nachrichten freizugeben.

Um dieses Problem zu beheben, können Sie einen Mechanismus wie std::shared_ptr verwenden; dieser ermöglicht, dass mehrere Komponenten einen Zeiger besitzen. Wenn das endgültige shared_ptr-Objekt zerstört wird, das eine Ressource besitzt, wird auch die Ressource freigegeben.

Im folgenden Beispiel wird veranschaulicht, wie mit shared_ptr Zeigerwerte für mehrere Nachrichtenpuffer gemeinsam verwendet werden können. In diesem Beispiel wird ein Concurrency::overwrite_buffer-Objekt mit drei Concurrency::call-Objekten verbunden. Die overwrite_buffer-Klasse stellt Nachrichten für die einzelnen Ziele bereit. Da das Datennetzwerk am Ende mehrere Besitzer der Daten aufweist, werden die einzelnen call-Objekte mit shared_ptr in die Lage versetzt, den Besitz der Nachrichten zu teilen.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Dieses Beispiel erzeugt die folgende Beispielausgabe:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Siehe auch

Aufgaben

Exemplarische Vorgehensweise: Erstellen einer agentbasierten Anwendung

Konzepte

Empfohlene Vorgehensweisen im Zusammenhang mit der Concurrency Runtime

Asynchronous Agents Library

Weitere Ressourcen

Exemplarische Vorgehensweise: Erstellen eines Datenfluss-Agent

Exemplarische Vorgehensweise: Erstellen eines Bildverarbeitungsnetzwerks

Empfohlene Vorgehensweisen in der Parallel Patterns Library

Allgemein empfohlene Vorgehensweisen in der Concurrency Runtime