Асинхронные блоки сообщений

Библиотека агентов предоставляет несколько типов блоков сообщений, позволяющих распространять сообщения среди компонентов приложений потокобезопасным способом. Эти типы блоков сообщений часто используются с различными процедурами передачи сообщений, например Concurrency::send, Concurrency::asend, Concurrency::receive и Concurrency::try_receive. Дополнительные сведения о процедурах передачи сообщений, которые определяются библиотекой агентов, см. в разделе Функции передачи сообщений.

Подразделы

В этом разделе содержатся следующие подразделы.

  • Источники и целевые объекты

  • Распространение сообщений

  • Обзор типов блоков сообщений

  • Класс unbounded_buffer

  • Класс overwrite_buffer

  • Класс single_assignment

  • Класс call

  • Класс transformer

  • Класс choice

  • Классы join и multitype_join

  • Класс timer

  • Фильтрация сообщений

  • Резервирование сообщений

Источники и целевые объекты

Источники и целевые объекты — это два важных участника передачи сообщений. Под источником понимается конечная точка передачи данных, которая отправляет сообщения. Под целевым объектом понимается конечная точка передачи данных, которая принимает сообщения. Источник можно считать конечной точкой, из которой производится чтение, а целевой объект — конечной точкой, в которую производится запись. Приложения связывают между собой источники и целевые объекты, образуя сети обмена сообщениями.

В библиотеке агентов для представления источников и целевых объектов используются два абстрактных класса: Concurrency::ISource и Concurrency::ITarget. Типы блоков сообщений, выполняющие роль источников, являются производными от класса ISource; типы блоков сообщений, выполняющие роль целевых объектов, являются производными от класса ITarget. Типы блоков сообщений, выполняющие роль исходных и целевых объектов, наследуются от классов ISource и ITarget.

[в начало]

Распространение сообщений

Распространение сообщений — это отправка сообщения из одного компонента в другой. Когда блоку сообщений предлагается сообщение, он может принять, отклонить или отложить его. Различные типы блоков сообщений сохраняют и передают сообщения разными способами. Например, класс unbounded_buffer сохраняет несколько сообщений, класс overwrite_buffer сохраняет сообщения только по одному, а класс transformer сохраняет измененную версию каждого сообщения. Эти типы блоков сообщений описаны более подробно ниже в этом документе.

Когда блок сообщений принимает сообщение, он может выполнить работу, а затем, если он является исходным блоком, передать обработанное сообщение другому члену сети. Блок сообщений может использовать функцию фильтрации, чтобы отклонять сообщения, которые он не хочет принимать. Фильтры описаны более подробно далее в этом разделе в подразделе Фильтрация сообщений. Блок сообщений, откладывающий сообщение, может зарезервировать его и употребить позже. Резервирование сообщений описано более подробно далее в этом разделе в подразделе Резервирование сообщений.

Библиотека агентов позволяет блокам сообщений передавать сообщения асинхронно или синхронно. При синхронной передаче сообщения блоку сообщений, например с помощью функции send, среда выполнения блокирует текущий контекст, пока целевой блок не примет или не отклонит сообщение. При асинхронной передаче сообщения блоку сообщений, например с помощью функции asend, среда выполнения предлагает сообщение целевому блоку, и если он принимает сообщение, планирует асинхронную задачу, распространяющую сообщение для принимающей стороны. Среда выполнения использует упрощенные задачи, чтобы распространять сообщения совместно. Дополнительные сведения об упрощенных задачах см. в разделе Планировщик задач (среда выполнения с параллелизмом).

Приложения связывают между собой источники и целевые объекты, образуя сети обмена сообщениями. Чтобы передать данные в сеть, обычно выполняется подключение к ней и вызов send или asend. Чтобы подключить исходный блок сообщений к целевому, следует вызвать метод Concurrency::ISource::link_target. Чтобы отключить исходный блок сообщений от целевого, следует вызвать метод Concurrency::ISource::unlink_target. Чтобы отключить исходный блок от всех целевых, следует вызвать метод Concurrency::ISource::unlink_targets. Когда один из предопределенных типов блоков сообщений покидает область или уничтожается, он автоматически отключается от всех целевых блоков. Некоторые типы блоков сообщений ограничивают максимальное количество целевых объектов, в которые они могут производить запись. В следующем разделе описаны ограничения, относящиеся к предопределенным типам блоков сообщения.

[в начало]

Обзор типов блоков сообщений

В следующей таблице кратко описывается роль важных типов блоков сообщений.

  • unbounded_buffer
    Хранит очередь сообщений.

  • overwrite_buffer
    Хранит одно сообщение, которое можно многократно записывать и считывать.

  • single_assignment
    Хранит одно сообщение, которое можно один раз записать и многократно считывать.

  • call
    Выполняет работу при получении сообщения.

  • transformer
    Выполняет работу при получении данных и отправляет результат своей работы другому целевому блоку. Класс transformer может действовать на различные типы входных и выходных данных.

  • choice
    Выбирает первое доступное сообщение из набора источников.

  • join и multitype_join
    Ожидает получения всех сообщений из набора источников, затем объединяет эти сообщения в одно сообщение для другого блока сообщений.

  • timer
    Отправляет сообщение в целевой блок через постоянные интервалы.

Эти типы блоков сообщений обладают различными характеристиками, делающими их полезными в различных ситуациях. Вот некоторые из этих характеристик.

  • Тип распространения: является ли блок сообщений источником данных, приемником данных или и тем, и другим.

  • Упорядочение сообщений: сохраняет ли блок сообщений исходный порядок, в котором сообщения отправлялись или принимались. Все предопределенные типы блоков сообщений сохраняют исходный порядок, в котором отправляются или принимаются сообщения.

  • Число источников: максимальное число источников, из которых блок сообщений может производить чтение.

  • Число целевых объектов: максимальное число целевых объектов, в которые блок сообщений может производить запись.

В следующей таблице показано, как эти характеристики соотносятся с различными типами блоков сообщений.

Тип блока сообщений

Тип распространения (источник, целевой объект или оба значения)

Упорядочение сообщений (упорядоченный или неупорядоченный)

Число источников

Число целевых объектов

unbounded_buffer

Оба

Упорядоченный

Неограниченный

Неограниченный

overwrite_buffer

Оба

Упорядоченный

Неограниченный

Неограниченный

single_assignment

Оба

Упорядоченный

Неограниченный

Неограниченный

call

Целевой объект

Упорядоченный

Неограниченный

Не применяется.

transformer

Оба

Упорядоченный

Неограниченный

1

choice

Оба

Упорядоченный

10

1

join

Оба

Упорядоченный

Неограниченный

1

multitype_join

Оба

Упорядоченный

10

1

timer

Исходный код

Не применяется.

Не применяется.

1

В следующих подразделах типы блоков сообщений описаны более подробно.

[в начало]

Класс unbounded_buffer

Класс Concurrency::unbounded_buffer представляет структуру общего назначения для асинхронного обмена сообщениями. В этом классе хранится очередь сообщений типа "первым вошел — первым вышел" (FIFO), в которую могут записывать несколько источников и из которой могут читать несколько целевых объектов. Когда целевой объект получает сообщение от объекта unbounded_buffer, это сообщение удаляется из очереди сообщений. Поэтому, хотя объект unbounded_buffer допускает наличие нескольких целевых объектов, каждое сообщение может быть получено только одним из них. Класс unbounded_buffer удобен, если требуется передать несколько сообщений другому компоненту, и этот компонент должен получить каждое из этих сообщений.

Пример

В следующем примере показана основная схема работы с классом unbounded_buffer. В этом примере объекту unbounded_buffer отправляются три значения, затем они считываются с этого же объекта.

// unbounded_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Create an unbounded_buffer object that works with
   // int data.
   unbounded_buffer<int> items;

   // Send a few items to the unbounded_buffer object.
   send(items, 33);
   send(items, 44);
   send(items, 55);

   // Read the items from the unbounded_buffer object and print
   // them to the console.
   wcout << receive(items) << endl;
   wcout << receive(items) << endl;
   wcout << receive(items) << endl;
}

После выполнения примера получается следующий результат.

33
44
55

Полный пример, демонстрирующий использование класса unbounded_buffer, см. в разделе Практическое руководство. Реализация различных шаблонов "источник-приемник".

[в начало]

Класс overwrite_buffer

Класс Concurrency::overwrite_buffer похож на класс unbounded_buffer, но объект overwrite_buffer сохраняет только одно сообщение. Кроме того, когда целевой объект получает сообщение от объекта overwrite_buffer, это сообщение не удаляется из буфера. Поэтому копию этого сообщения могут получить несколько целевых объектов.

Класс overwrite_buffer удобен, если требуется передать несколько сообщений другому компоненту, но этому компоненту нужно только самое последнее значение. Этот класс также может оказаться полезным при необходимости широковещательной передачи сообщения нескольким компонентам.

Пример

В следующем примере показана основная схема работы с классом overwrite_buffer. В этом примере объекту overwrite _buffer отправляются три значения, затем текущее значение считывается с этого же объекта три раза. Этот пример похож на пример для класса unbounded_buffer. При этом класс overwrite_buffer сохраняет только одно сообщение. Кроме того, среда выполнения не удаляет сообщение из объекта overwrite_buffer после чтения.

// overwrite_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Create an overwrite_buffer object that works with
   // int data.
   overwrite_buffer<int> item;

   // Send a few items to the overwrite_buffer object.
   send(item, 33);
   send(item, 44);
   send(item, 55);

   // Read the current item from the overwrite_buffer object and print
   // it to the console three times.
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
}

После выполнения примера получается следующий результат.

55
55
55

Полный пример, демонстрирующий использование класса overwrite_buffer, см. в разделе Практическое руководство. Реализация различных шаблонов "источник-приемник".

[в начало]

Класс single_assignment

Класс Concurrency::single_assignment похож на класс overwrite_buffer, но объект single_assignment допускает только однократную запись. Как и в случае класса overwrite_buffer, когда целевой объект получает сообщение от объекта single_assignment, это сообщение не удаляется. Поэтому копию этого сообщения могут получить несколько целевых объектов. Класс single_assignment удобен при необходимости широковещательной передачи одного сообщения нескольким компонентам.

Пример

В следующем примере показана основная схема работы с классом single_assignment. В этом примере объекту single_assignment отправляются три значения, затем текущее значение считывается с этого же объекта три раза. Этот пример похож на пример для класса overwrite_buffer. Хотя как класс overwrite_buffer, так и класс single_assignment сохраняет только одно сообщение, класс single_assignment допускает только однократную запись.

// single_assignment-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Create an single_assignment object that works with
   // int data.
   single_assignment<int> item;

   // Send a few items to the single_assignment object.
   send(item, 33);
   send(item, 44);
   send(item, 55);

   // Read the current item from the single_assignment object and print
   // it to the console three times.
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
}

После выполнения примера получается следующий результат.

33
33
33

Полный пример, демонстрирующий использование класса single_assignment, см. в разделе Пошаговое руководство. Реализация фьючерсов.

[в начало]

Класс call

Класс Concurrency::call служит приемником сообщения, выполняющим рабочую функцию при получении данных. Эта рабочая функция может быть лямбда-выражением, объектом функции или указателем на функцию. Поведение объекта call отличается от обычного вызова функции, так как этот объект работает параллельно с другими компонентами, отправляющими ему сообщения. Если при получении сообщения объект call занят, он добавляет это сообщение в очередь. Каждый объект call обрабатывает сообщения из очереди в порядке их получения.

Пример

В следующем примере показана основная схема работы с классом call. В этом примере создается объект call, выводящий все получаемые значения на консоль. Затем пример отправляет три значения объекту call. Так как объект call обрабатывает сообщения в отдельном потоке, в этом примере также используются переменная счетчика и объект event, чтобы объект call обрабатывал все сообщения до выполнения возврата функцией wmain.

// call-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

int wmain()
{
   // An event that is set when the call object receives all values.
   event received_all;

   // Counts the 
   long receive_count = 0L;
   long max_receive_count = 3L;

   // Create an call object that works with int data.
   call<int> target([&received_all,&receive_count,max_receive_count](int n) {
      // Print the value that the call object receives to the console.
      wcout << n << endl;

      // Set the event when all messages have been processed.
      if (++receive_count == max_receive_count)
         received_all.set();
   });

   // Send a few items to the call object.
   send(target, 33);
   send(target, 44);
   send(target, 55);

   // Wait for the call object to process all items.
   received_all.wait();
}

После выполнения примера получается следующий результат.

33
44
55

Полный пример, демонстрирующий использование класса call, см. в разделе Практическое руководство. Предоставление рабочих функций классам call и transformer.

[в начало]

Класс transformer

Класс Concurrency::transformer действует и как приемник, и как отправитель сообщений. Класс transformer похож на класс call, так как он выполняет определенную пользователем рабочую функцию при получении данных. Однако класс transformer также отправляет результат выполнения рабочей функции объектам-получателям. Как и объект call, объект transformer действует параллельно с другими компонентами, передающими ему сообщения. Если при получении сообщения объект transformer занят, он добавляет это сообщение в очередь. Каждый объект transformer обрабатывает сообщения из своей очереди в порядке их получения.

Класс transformer отправляет свои сообщения одному целевому объекту. Если в конструкторе для параметра _PTarget задано значение NULL, впоследствии целевой объект можно задать, вызвав метод Concurrency::link_target.

В отличие от других асинхронных типов блоков сообщений, предоставляемых библиотекой агентов, класс transformer может действовать на различные типы входных и выходных данных. Эта способность преобразовывать данные из одного типа в другой превращает класс transformer в ключевой компонент многих параллельных сетей. Кроме того, в рабочую функцию объекта transformer можно добавить возможность параллельной обработки более мелких фрагментов данных.

Пример

В следующем примере показана основная схема работы с классом transformer. В этом примере создается объект transformer, умножающий все введенные значения int на 0,33, чтобы получить на выходе значение double. Затем пример получает преобразованные значения из того же объекта transformer и выводит их на консоль.

// transformer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Create an transformer object that receives int data and 
   // sends double data.
   transformer<int, double> third([](int n) {
      // Return one-third of the input value.
      return n * 0.33;
   });

   // Send a few items to the transformer object.
   send(third, 33);
   send(third, 44);
   send(third, 55);

   // Read the processed items from the transformer object and print
   // them to the console.
   wcout << receive(third) << endl;
   wcout << receive(third) << endl;
   wcout << receive(third) << endl;
}

После выполнения примера получается следующий результат.

10.89
14.52
18.15

Полный пример, демонстрирующий использование класса transformer, см. в разделе Практическое руководство. Использование преобразователя в конвейере данных.

[в начало]

Класс choice

Класс Concurrency::choice выбирает первое доступное сообщение из набора исходных объектов. Класс choice представляет механизм потока управления вместо механизма потока данных (различия между потоком данных и потоком управления рассматриваются в разделе Библиотека асинхронных агентов).

Чтение из объекта choice похоже на вызов функции Windows API WaitForMultipleObjects, когда для ее параметра bWaitAll задано значение FALSE. Однако класс choice привязывает данные к самому событию, а не ко внешнему объекту синхронизации.

Обычно класс choice используется вместе с функцией Concurrency::receive для использования потока управления в приложении. Используйте класс choice, если требуется выбирать между буферами сообщений, имеющих различные типы. Используйте класс single_assignment, если требуется выбирать между буферами сообщений, имеющих одинаковый тип.

Порядок связывания источников с объектом choice имеет важное значение, так как он определяет выбираемое сообщение. Например, рассмотрим случай, когда с объектом choice связываются несколько буферов сообщений, уже содержащих сообщение. Объект choice выбирает сообщение из первого связанного с ним источника. После того как все источники будут связаны, объект choice сохраняет порядок, в котором каждый из источников получает сообщение.

Пример

В следующем примере показана основная схема работы с классом choice. В этом примере функция Concurrency::make_choice используется для создания объекта choice, выбирающего один из трех блоков сообщений. Затем в примере вычисляются различные числа Фибоначчи, и каждый результат сохраняется в отдельном блоке сообщений. Затем пример выводит на консоль сообщение, основанное на первой завершившейся операции.

// choice-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Although the following thee message blocks are written to one time only, 
   // this example illustrates the fact that the choice class works with 
   // different message block types.

   // Holds the 35th Fibonacci number.
   single_assignment<int> fib35;
   // Holds the 37th Fibonacci number.
   overwrite_buffer<int> fib37;
   // Holds half of the 42nd Fibonacci number.
   unbounded_buffer<double> half_of_fib42;   

   // Create a choice object that selects the first single_assignment 
   // object that receives a value.
   auto select_one = make_choice(&fib35, &fib37, &half_of_fib42);

   // Execute a few lengthy operations in parallel. Each operation sends its 
   // result to one of the single_assignment objects.
   parallel_invoke(
      [&fib35] { send(fib35, fibonacci(35)); },
      [&fib37] { send(fib37, fibonacci(37)); },
      [&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
   );

   // Print a message that is based on the operation that finished first.
   switch (receive(select_one))
   {
   case 0:
      wcout << L"fib35 received its value first. Result = " 
            << receive(fib35) << endl;
      break;
   case 1:
      wcout << L"fib37 received its value first. Result = " 
            << receive(fib37) << endl;
      break;
   case 2:
      wcout << L"half_of_fib42 received its value first. Result = " 
            << receive(half_of_fib42) << endl;
      break;
   default:
      wcout << L"Unexpected." << endl;
      break;
   }
}

В данном примере получается следующий результат.

fib35 received its value first. Result = 9227465

Так как задача, вычисляющая 35е число Фибоначчи, не обязательно завершится первой, результат примера может быть разным.

В этом примере для параллельного вычисления чисел Фибоначчи используется алгоритм Concurrency::parallel_invoke. Дополнительные сведения о parallel_invoke см. в разделе Параллельные алгоритмы.

Полный пример, демонстрирующий использование класса choice, см. в разделе Практическое руководство. Выбор среди завершенных задач.

[в начало]

Классы join и multitype_join

Классы Concurrency::join и Concurrency::multitype_join позволяют ожидать, пока все члены набора исходных объектов не получат сообщение. Класс join предназначен для объектов-источников, имеющих общий тип сообщений. Класс multitype_join предназначен для объектов-источников, имеющих разные типы сообщений.

Чтение из объекта join или multitype_join похоже на вызов функции Windows API WaitForMultipleObjects, когда для ее параметра bWaitAll задано значение TRUE. Однако, как и объект choice, объекты join и multitype_join используют механизм событий, который привязывает данные к самому событию, а не ко внешнему объекту синхронизации.

При чтении из объекта join создается объект std::vector. При чтении из объекта multitype_join создается объект std::tuple. Элементы располагаются в этих объектах в том порядке, в каком соответствующие буферы источника подсоединяются к объекту join или multitype_join. Так как порядок, в котором исходные буферы присоединяются к объекту join или multitype_join, связан с порядком элементов в итоговом объекте vector или tuple, рекомендуется не отсоединять существующий исходный буфер от соединения. Это может привести к неопределенному поведению.

"Жадные" и "нежадные" объединения

Классы join и multitype_join поддерживают концепцию жадных и нежадных объединений. При жадном объединении сообщения от каждого из источников принимаются по мере их поступления до тех пор, пока не будут доступны все сообщения. При нежадном объединении получение сообщений производится в два этапа. Сначала нежадное объединение ожидает, пока ему не предложат по сообщению от каждого из исходных объектов. Затем, когда все исходные сообщения доступны, нежадное соединение пытается зарезервировать все эти сообщения. Если оно может зарезервировать все сообщения, оно употребляет их и распространяет для целевых объектов. В противном случае оно высвобождает, или отменяет, резервирование сообщений и снова ожидает, пока все исходные объекты не получат сообщение.

Жадное объединение работает лучше нежадного, поскольку сообщения принимаются немедленно. В редких случаях жадные соединения могут приводить к взаимоблокировкам. Используйте нежадное объединение при наличии нескольких объединений, содержащих один или несколько общих объектов-источников.

Пример

В следующем примере показана основная схема работы с классом join. В этом примере функция Concurrency::make_join используется для создания объекта join, получающего данные от трех объектов single_assignment. В этом примере вычисляются различные числа Фибоначчи, затем каждый результат сохраняется в отдельный объект single_assignment, затем на консоль выводятся все результаты, содержащиеся в объекте join. Этот пример похож на пример для класса choice, но класс join ожидает, пока все исходные блоки сообщений не получат сообщение.

// join-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Holds the 35th Fibonacci number.
   single_assignment<int> fib35;
   // Holds the 37th Fibonacci number.
   single_assignment<int> fib37;
   // Holds half of the 42nd Fibonacci number.
   single_assignment<double> half_of_fib42;   

   // Create a join object that selects the values from each of the
   // single_assignment objects.
   auto join_all = make_join(&fib35, &fib37, &half_of_fib42);

   // Execute a few lengthy operations in parallel. Each operation sends its 
   // result to one of the single_assignment objects.
   parallel_invoke(
      [&fib35] { send(fib35, fibonacci(35)); },
      [&fib37] { send(fib37, fibonacci(37)); },
      [&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
   );

   auto result = receive(join_all);
   wcout << L"fib35 = " << get<0>(result) << endl;
   wcout << L"fib37 = " << get<1>(result) << endl;
   wcout << L"half_of_fib42 = " << get<2>(result) << endl;
}

После выполнения примера получается следующий результат.

fib35 = 9227465
fib37 = 24157817
half_of_fib42 = 1.33957e+008

В этом примере для параллельного вычисления чисел Фибоначчи используется алгоритм Concurrency::parallel_invoke. Дополнительные сведения о parallel_invoke см. в разделе Параллельные алгоритмы.

Полные примеры, демонстрирующие использование класса join, см. в разделах Практическое руководство. Выбор среди завершенных задач и Пошаговое руководство. Использование класса join для предотвращения взаимоблокировки.

[в начало]

Класс timer

Класс Concurrency::timer выполняет роль источника сообщений. Объект timer отправляет сообщение целевому объекту по прошествии указанного периода времени. Класс timer удобен, если требуется задержать передачу сообщения или требуется передавать сообщения через постоянные интервалы времени.

Класс timer отправляет свое сообщение только одному целевому объекту. Если в конструкторе для параметра _PTarget задано значение NULL, впоследствии целевой объект можно задать, вызвав метод Concurrency::ISource::link_target.

Объект timer может быть повторяющимся или неповторяющимся. Чтобы создать повторяющийся таймер, при вызове конструктора передайте значение true для параметра _Repeating. В противном случае для создания неповторяющегося таймера при вызове конструктора передайте значение false для параметра _Repeating. Если таймер повторяющийся, он передает своему целевому объекту одно и то же сообщение через постоянный интервал.

Библиотека агентов создает объекты timer в незапущенном состоянии. Чтобы запустить объект таймера, вызовите метод Concurrency::timer::start. Чтобы остановить объект timer, уничтожьте его или вызовите метод Concurrency::timer::stop. Чтобы приостановить повторяющийся таймер, вызовите метод Concurrency::timer::pause.

Пример

В следующем примере показана основная схема работы с классом timer. В следующем примере объекты timer и call используются для сообщения о ходе выполнения длительной операции.

// timer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Create a call object that prints characters that it receives 
   // to the console.
   call<wchar_t> print_character([](wchar_t c) {
      wcout << c;
   });

   // Create a timer object that sends the period (.) character to 
   // the call object every 100 milliseconds.
   timer<wchar_t> progress_timer(100u, L'.', &print_character, true);

   // Start the timer.
   wcout << L"Computing fib(42)";
   progress_timer.start();

   // Compute the 42nd Fibonacci number.
   int fib42 = fibonacci(42);

   // Stop the timer and print the result.
   progress_timer.stop();
   wcout << endl << L"result is " << fib42 << endl;
}

В данном примере получается следующий результат.

Computing fib(42)..................................................
result is 267914296

Полный пример, демонстрирующий использование класса timer, см. в разделе Практическое руководство. Отправка сообщений через определенные интервалы.

[в начало]

Фильтрация сообщений

При создании объекта блока сообщений можно предоставить функцию фильтрации, которая определяет для блока сообщений, принимает он или отклоняет сообщение. Функция фильтрации позволяет блоку сообщений получать только определенные значения.

В следующем примере показано, как создать объект unbounded_buffer, который использует функцию фильтрации, чтобы принимать только четные числа. Объект unbounded_buffer отклоняет нечетные числа и не распространяет их для целевых блоков.

// filter-function.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Create an unbounded_buffer object that uses a filter
   // function to accept only even numbers.
   unbounded_buffer<int> accept_evens(
      [](int n) {
         return (n%2) == 0;
      });

   // Send a few values to the unbounded_buffer object.
   unsigned int accept_count = 0;
   for (int i = 0; i < 10; ++i)
   {
      // The asend function returns true only if the target
      // accepts the message. This enables us to determine
      // how many elements are stored in the unbounded_buffer
      // object.
      if (asend(accept_evens, i))
      {
         ++accept_count;
      }
   }

   // Print to the console each value that is stored in the 
   // unbounded_buffer object. The unbounded_buffer object should
   // contain only even numbers.
   while (accept_count > 0)
   {
      wcout << receive(accept_evens) << L' ';
      --accept_count;
   }
}

После выполнения примера получается следующий результат.

0 2 4 6 8

Функция фильтрации может быть лямбда-функцией, указателем функции или объектом функции. Любая функция фильтрации принимает одну из следующих форм.

bool (_Type)
bool (_Type const &)

Чтобы исключить лишнее копирование данных, при работе с составным типом, распространяемым по значению, следует использовать вторую форму.

Фильтрация сообщений поддерживает модель программирования поток данных, в которой компоненты выполняют вычисления при получении данных. Примеры использования функций фильтрации для управления потоком данных в сети передачи сообщений см. в разделах Практическое руководство. Использование фильтра блока сообщений, Пошаговое руководство. Создание агента потоков данных и Пошаговое руководство. Создание сети обработки изображений.

[в начало]

Резервирование сообщений

Резервирование сообщений позволяет блоку сообщений зарезервировать сообщение и использовать его позже. Обычно резервирование сообщений не используется напрямую. Тем не менее, если вы понимаете, что такое резервирование сообщений, вам будет проще разобраться с поведением некоторых предопределенных типов блоков сообщений.

Рассмотрим жадные и нежадные соединения. Они используют резервирование сообщений, чтобы использовать их позже. Как описано ранее, нежадное соединение получает сообщения в два этапа. На первом этапе нежадный объект join ожидает, пока всего его исходные объекты не получат сообщение. Затем нежадное соединение пытается зарезервировать все эти сообщения. Если оно может зарезервировать все сообщения, оно употребляет их и распространяет для целевых объектов. В противном случае оно высвобождает, или отменяет, резервирование сообщений и снова ожидает, пока все исходные объекты не получат сообщение.

Жадное соединение, также считывающее входящие сообщения из нескольких исходных объектов, использует резервирование сообщений, чтобы считывать дополнительные сообщения, пока не получены сообщения от всех исходных объектов. Например, рассмотрим жадное соединение, получающее сообщения от блоков сообщений A и B. Если жадное соединение получает два сообщения от B, но еще не получило сообщение от A, жадное соединение сохраняет уникальный идентификатор сообщения для второго сообщения от B. После того, как жадное соединение получает сообщение от A и распространяет эти сообщения, оно использует сохраненный идентификатор сообщения, чтобы проверить, доступно ли еще второе сообщение от B.

Резервирование сообщений можно использовать при реализации пользовательских типов блоков сообщений. Пример создания пользовательского типа блоков сообщений см. в разделе Пошаговое руководство. Создание пользовательского блока сообщений.

[в начало]

См. также

Основные понятия

Библиотека асинхронных агентов

Журнал изменений

Дата

Журнал

Причина

Август 2010

Добавлены общие примеры.

Улучшение информации.