Асинхронные агенты

Программирование на основе акторов с помощью Asynchronous Agents Library

Майкл Чу (Michael Chu) и Кришнан Варадарайан (Krishnan Varadarajan)

Теперь, когда многоядерные процессоры широко распространены и устанавливаются в любые компьютеры — от серверов и настольных машин до лэптопов, распараллеливание кода становится важным как никогда ранее. Ввиду этого в Visual Studio 2010 введено несколько новых способов, позволяющих разработчикам на C++ задействовать возможности этих процессоров с помощью новой исполняющей среды параллельного кода и новых моделей параллельного программирования. Однако теперь перед разработчиками возникает одна новая проблема:выбор корректной модели программирования для приложений. Корректная модель может поспособствовать активному использованию параллелизма, но может также потребовать полного переосмысления того, как структурируется и на самом деле выполняется программа.

К наиболее часто применяемым на сегодняшний день моделям параллельного программирования относятся универсальные контейнеры с поддержкой параллельной обработки (concurrency-aware containers) и такие алгоритмы, как параллельные циклы. Хотя эти традиционные модели могут оказаться эффективным способом масштабирования приложений, способных использовать преимущества многоядерных процессоров, они не устраняют один из важнейших факторов, сказывающихся на производительности параллельного кода: растущее влияние латентности (latency). Методики распараллеливания ускоряют вычисления и позволяют распределять их по нескольким ядрам процессора, но закон Амдала (wikipedia.org/wiki/Amdahl's_law) показывает нам, что повышение производительности ограничивается самой медленной частью выполняемого кода. А во многих случаях возрастает доля времени, проводимого в ожидании дискового или сетевого ввода-вывода данных.

Модели программирования на основе акторов весьма неплохо справляются с такими проблемами, как латентность, и впервые появились в начале 70-х годов для использования ресурсов компьютеров с сотнями или тысячами независимых процессоров. Фундаментальная концепция, стоящая за моделью акторов, — интерпретация компонентов приложения как индивидуальных акторов, способных взаимодействовать с остальным миром, посылая, принимая и обрабатывая сообщения.

Сравнительно недавно с появлением многоядерных процессоров об этой модели вновь вспомнили как об эффективном способе борьбы с задержками для ускорения параллельного выполнения. В Visual Studio 2010 включена библиотека Asynchronous Agents Library (AAL), построенная на новой модели акторов с интерфейсами передачи сообщений, где агенты выступают в роли акторов. AAL позволяет разработчикам проектировать свои приложения с большим акцентом на потоки данных. Как правило, это дает возможность продуктивно использовать задержки в ожидании данных.

В этой статье мы дадим обзор библиотеки AAL и продемонстрируем, как использовать ее преимущества в приложениях.

Concurrency Runtime

Фундамент поддержки параллельного выполнения в Visual Studio 2010 и AAL — новая исполняющая среда параллельного кода, Concurrency Runtime, которая поставляется как часть C Runtime (CRT) в Visual Studio 2010. Concurrency Runtime предлагает кооперативный планировщик задач и диспетчер ресурсов «с глубоким знанием нижележащих аппаратных ресурсов». Это позволяет исполняющей среде выполнять задачи с распределением нагрузки по нескольким ядрам процессора.

На рис. 1 показана схема поддержки в Visual Studio 2010 параллельного выполнения неуправляемого кода. Планировщик (scheduler) — основной компонент, определяющий, где и когда выполняются задачи. Он опирается на информацию, собранную диспетчером ресурсов (Resource Manager), для наиболее эффективного использования ресурсов выполнения. Приложения и библиотеки в основном взаимодействуют с Concurrency Runtime через две модели программирования, размещаемые поверх планировщика:AAL и Parallel Patterns Library (PPL); однако при необходимости они могут напрямую взаимодействовать с самой исполняющей средой.

image: The Concurrency Runtime

Рисунок 1 Concurrency Runtime

PPL предлагает более традиционные способы распараллеливания вроде конструкций parallel_for и parallel_for_each, блокировок и параллельно обрабатываемых структур данных, например очередей и векторов. Хотя статья посвящена другой библиотеке, все же отметим, что PPL — мощный инструмент для разработчиков, который можно использовать в сочетании со всеми новыми методами, введенными в AAL. Подробнее о PPL см. февральскую рубрику «Windows и C++» за 2009 г. (msdn.microsoft.com/magazine/dd434652).

AAL в противоположность PPL обеспечивает распараллеливание приложений на более высоком уровне и с использованием подхода, отличного от традиционных методик. Разработчикам нужно проектировать приложения с точки зрения обрабатываемых данных и думать о том, как распределить обработку данных по компонентам или этапам, которые можно выполнять параллельно.

AAL предоставляет два основных компонента: инфраструктуру передачи сообщений и асинхронные агенты.

Инфраструктура передачи сообщений включает набор блоков сообщений (message blocks), что позволяет принимать, обрабатывать и распространять сообщения. Объединяя блоки сообщений в цепочку, можно создавать конвейеры, способные работать одновременно.

Асинхронные агенты являются акторами, которые взаимодействуют с миром, принимая сообщения, выполняя локальную работу в соответствии с самостоятельно поддерживаемым состоянием и передавая сообщения.

Совместно эти два компонента позволяют задействовать параллелизм в терминах потока данных, а не потока управления, и обеспечивают более высокую толерантность к задержкам за счет более эффективного использования параллельных ресурсов.

Инфраструктура передачи сообщений

Один из двух основных компонентов AAL — инфраструктура передачи сообщений, набор конструкций, помогающих создавать сети потоков данных для конвейеризации работы. Такая конвейеризация — важнейший элемент модели потоков данных (dataflow model), так как она позволяет параллельно обрабатывать потоковые данные, разбивая работу на множество независимых стадий. Когда обработка данных на одной стадии заканчивается, данные могут быть переданы на другую стадию, а первая стадия конвейера будет ожидать поступления следующей порции данных для обработки.

В качестве примера рассмотрим программу электронной почты, которая форматирует исходящие сообщения и выступает в роли цензора, удаляя из них неподходящий контент. Код для операций такого типа показан ниже:

std::foreach(reader.begin(); reader.end(); 
  [](const string& word) { 
    auto w1 = censor(word); 
    auto w2 = format(w1); 
    writer.write_word(w2);
  });

Каждое слово в сообщении электронной почты нужно проверить на наличие в словаре, заменяя его при необходимости другим словом. Далее код форматирует каждое слово согласно набору правил.

Этому примеру изначально присущ высокий уровень параллелизма. Однако традиционные методы распараллеливания здесь неэффективны. Например, цензору для строк в тексте проще всего было бы использовать алгоритм parallel_for_each.

И первый серьезный недостаток такого решения заключается в том, что тогда придется считывать весь файл, чтобы итератор мог корректно распределить работу.
Считывание всего файла делает процесс зависимым от ввода-вывода и может свести на нет все усилия по распараллеливанию. Разумеется, вы могли бы использовать «интеллектуальный» итератор, способный перекрывать обработку слов с чтением ввода.

Второй значимый недостаток традиционного подхода к распараллеливанию состоит в упорядочении. Очевидно, что в случае сообщения электронной почты при параллельной обработке текста нужно поддерживать порядок текста, а иначе смысл сообщения будет напрочь утерян. Чтобы поддерживать этот порядок, подход с parallel_for_each потребовал бы значительных издержек, связанных с синхронизацией и буферизацией, которая автоматически обрабатывается AAL.

Обрабатывая сообщение в конвейере, вы можете избежать этих двух проблем и в полной мере использовать преимущества параллельного выполнения. Посмотрим на рис. 2, где был создан простой конвейер. В данном примере основные задачи приложения — цензура и форматирование — разделены на две стадии. На первой принимается строка и проверяется ее наличие в словаре. Если совпадение найдено, блок цензора заменяет строку другим словом из словаря. В ином случае он выдает то же сообщение, которое было получено на входе. Аналогично на второй стадии блок форматирования принимает каждое слово и форматирует его в заданном стиле.

image: E-mail Processing Pipeline

Рисунок 2 Конвейер обработки электронной почты

В этом примере можно получить выигрыш от подхода, ориентированного на потоки данных, в нескольких отношениях. Во-первых, поскольку он исключает необходимость в считывании всего сообщения перед обработкой, строки сообщения можно немедленно пропускать через стадии цензурирования и форматирования. Во-вторых, конвейер позволяет блоку форматирования обрабатывать одну строку, а блоку цензуры параллельно обрабатывать следующую строку. Наконец, строки обрабатываются в том порядке, в каком они появляются в исходном тексте, а потому никакой дополнительной синхронизации не требуется.

Блоки сообщений

Блоки сообщений (messages blocks) обеспечивают прием, обработку, хранение и распространение сообщений. Эти блоки бывают трех видов: источники (sources), мишени (targets) и распространители (propagators). Источники могут лишь распространять сообщения, тогда как мишени — принимать, хранить и обрабатывать их. Большинство блоков — распространители, т. е. одновременно являются и источниками, и мишенями. Иначе говоря, они могут принимать, хранить, обрабатывать и рассылать сообщения.

AAL содержит набор примитивов блоков сообщений, достаточный для подавляющего большинства случаев применения. На рисунке 3 дан краткий обзор всех блоков сообщений, включенных в AAL. Однако модель остается открытой, поэтому, если вашему приложению требуется блок сообщения со специфическим поведением, вы можете написать собственный блок, который будет взаимодействовать со всеми предопределенными блоками. Каждый блок обладает своими уникальными характеристиками в отношении обработки, хранения и распространения сообщений.

Риунок 3 Блоки сообщений в AAL

Блок сообщений Описание
unbounded_buffer<Type> Хранит неограниченное (unbounded) количество сообщений и рассылает их мишеням.
overwrite_buffer<Type> Хранит единственное сообщение, которое перезаписывается всякий раз, когда поступает новое сообщение, и широковещательно рассылает его своим мишеням.
single_assignment<Type> Хранит единственное сообщение, которое записывается лишь раз, и широковещательно рассылает его своим мишеням.
transformer<Input,Output> Принимает сообщение типа Input и выполняет переданную пользовательскую функцию для его преобразования в сообщение типа Output. Преобразованное сообщение рассылается своим мишеням.
call<Type> Принимает сообщение и выполняет пользовательскую функцию, переданную в этом сообщении как аргумент. Это чистая мишень сообщений.
timer<Type> Передает сообщение своей мишени через заданное пользователем время. Эта операция может быть разовой или периодичной. Данный блок является чистым источником сообщений.
choice<Type1,Type2,...> Получает сообщения от нескольких источников разных типов и будет принимать сообщение только от первого блока, который послал сообщение choice.
join<Type> Получает сообщения от нескольких источников и объединяет их в одно выходное сообщение. Асинхронно ожидает готовность сообщений от каждого источника.
multitype_join<Type1,Type2,...> Получает сообщения от нескольких источников разных типов и объединяет их. Асинхронно ожидает готовность сообщений от каждого источника.

Одно из основных преимуществ примитивов блоков сообщений, предоставляемых AAL, заключается в том, что их можно комбинировать. Поэтому вы можете добиться нужного поведения, подобрав подходящую комбинацию примитивов. Например, можно легко создать блок, который объединяет несколько входных сообщений, подключив блок transformer к концу блока join. Когда блок join успешно завершает прием сообщений от своих источников, он может передать их блоку transformer, который суммирует полезные данные сообщений.

Вы могли бы также подключить блок таймера с повторением как источник для блока join. В итоге вы получили бы блок, который регулирует поток сообщений, пропуская их, только когда блок timer генерирует свое сообщение. Мы можем заменить этот конвейер двумя блоками transformer, как иллюстрирует рис. 4.

image: Composing Adder and Message Throttling Blocks from Primitives

Рисунок 4 Блоки Adder и Message Throttling, составленные из примитивов

Создание конвейера передачи сообщений

Теперь рассмотрим код для создания показанного ранее конвейера блоков сообщений. Мы можем заменить этот конвейер двумя блоками transformer, как иллюстрирует рис. 5. Задача блока transformer — принять сообщение определенного типа и выполнить над этим сообщением пользовательскую функцию, которая может модифицировать полезные данные сообщения или даже полностью изменить тип сообщения. Например, блок censor принимает входное сообщение, содержащее строку, и должен обработать ее.

image: A Message Block Pipeline

Рисунок 5 Конвейер блоков сообщений

Код для создания и подключения блоков сообщений показан на рис. 6. Этот код начинает с создания экземпляров двух блоков transformer. Лямбда-параметр C++0x в конструкторе блока censor определяет функцию преобразования, которая ищет входную строку из сообщения в словаре, чтобы выяснить, надо ли ее заменить на другую. После этого возвращается конечная строка, в блоке censor она обертывается в сообщение, и это сообщение отсылается из блока. Похожий путь строка проделывает в блоке transformer с тем исключением, что на выходе появляется строка, измененная функцией форматирования.

Рисунок 6 Простой конвейер сообщений

dictionary dict;

transformer<string, string> 
  censor([&dict](const string& s) -> string {

  string result = s;
  auto iter = dict.find(s);

  if (iter != dict.end()) {
    result =  iter->second;
  }

  return result;
});

transformer<string, string> 
  format([](const string& s) -> string {

  string result = s;
  for (string::size_type i = 0; i < s.size(); i++) {
    result[i] = (char)Format(s[i]);
  }

  return result;
});

censor.link_target(&format);

asend(&censor, "foo");
string newStr = receive(format);
printf("%s\n", newStr);

Вслед за созданием экземпляров двух блоков код связывает их вместе, вызывая метод link_target блока censor. Каждый блок-источник и распространитель имеют такой метод, с помощью которого можно определить, каким блокам источник должен отсылать свои сообщения.

После связывания блоков censor и format любое сообщение, переданное в блок censor, будет обработано его функцией transform, а конечное сообщение будет неявно передано блоку format для последующей обработки. Если блок сообщения является источником или распространителем, но к нему не подключена ни одна мишень, этот блок может хранить сообщение специфическим для него образом, пока не будет подключена мишень или пока это сообщение не будет извлечено.

Последние три строки кода демонстрируют процесс инициации отправки сообщений в блок и извлечения сообщения из блока. В AAL есть два API инициации: send и asend. Они соответственно вводят сообщение в блок синхронно и асинхронно.

Основное различие между ними в следующем.Когда вызов send возвращает управление, он гарантирует, что сообщение уже помещено в блок, которому оно было адресовано. Вызов asend может вернуть управление немедленно и позволить Concurrency Runtime планировать его распространение. Аналогично работают и два API извлечения сообщенийin: receive и try_receive. Метод receive блокируется до получения сообщения, а try_receive немедленно возвращает управление, если ему не удается сразу же извлечь сообщение.

На рис. 6 строка «foo» посылается блоку censor асинхронно. Этот блок получает сообщение, проверяет, есть ли такая строка в словаре, а затем помещает конечную строку в сообщение. Затем оно передается блоку format, который принимает строку, делает каждую букву заглавной и, поскольку у него нет мишени, хранит это сообщение. При вызове receive извлечет это сообщение из блока format. Таким образом, если предположить, что «foo» не было в словаре, на выходе из этого примера будет получена строка «FOO». Хотя в примере через сеть пропускается лишь одна строка, вполне наглядно видно, как поток входных строк образует конвейер выполнения.

Также обратите внимание, что в этом примере отсутствуют ссылки на сами сообщения. Сообщение — это просто конверт, обертывающий данные, которые вы хотите передать по своей сети потоков данных. Сама передача сообщения обрабатывается как процесс предложения и принятия (offering and accepting). Когда блок получает сообщение, он может хранить его любым нужным ему способом. Если впоследствии это сообщение понадобиться отправить, он предложит сообщение каждой из подключенных к нему мишеней. Чтобы на деле получить сообщение, приемник должен принять предлагаемое сообщение для завершения транзакции. Этот процесс передачи сообщений между блоками планируется и обрабатывается задачами, которые в свою очередь планируются и выполняются средой Concurrency Runtime.

Распространение блоков сообщений

Ну а теперь кратко рассмотрим, как сообщения передаются между блоками и какую роль играет Concurrency Runtime в AAL.

Эта информация не обязательна для использования блоков сообщений или AAL, но даст более глубокое представление о том, как работают протоколы передачи сообщений и как задействовать их преимущества. В остальной части этого раздела я буду обсуждать блоки-распространители, так как они являются и источниками, и мишенями. Очевидно, что чистый источник или мишень являются просто подмножеством реализации блока-распространителя.

На внутреннем уровне каждый блок-распространитель имеет входную очередь для сообщений и специфическое для конкретного блока хранилище. Другие блоки, связываемые с данным блоком-распространителем, посылают сообщения, которые помещаются во входную очередь.

Например, на рис. 7 у блока censor в transformer имеется входная очередь, в которой на данный момент хранится сообщение со строкой str6. Сам transformer содержит два сообщения: str4 и str5. Так как это transformer, его специфическое хранилище представляет собой другую очередь. У прочих типов блоков могут быть иные контейнеры-хранилища. Так, блок overwrite_buffer хранит лишь одно сообщение, которое всегда перезаписывается при получении нового сообщения.

image: Message-Passing Protocol

Рисунок 7 Протокол передачи сообщений

Когда сообщение предоставляется блоку от одного из связанных с ним источников (или через send/asend API), этот блок сначала вызывает функцию-фильтр, чтобы определить, надо ли принять данное сообщение. Если она решает, что сообщение следует принять, оно помещается во входную очередь. Фильтр является необязательной функцией, которую можно передавать в конструктор каждой мишени или распространителя; она возвращает булево значение, указывающее, следует ли принимать предложенное источником сообщение. Если сообщение отклоняется, источник предложит свое сообщение следующей связанной с ним мишени.

Как только сообщение оказывается во входной очереди, источник, от которого поступило это сообщение, прекращает его хранение. Однако у принимающего блока пока не готово выходное сообщение. Таким образом, сообщения могут накапливаться во входной очереди в ожидании обработки.

Когда сообщение появляется во входной очереди блока сообщений, планировщик Concurrency Runtime планирует к выполнению облегченную задачу (lightweight task, LWT). Цель этой LWT двоякая. Во-первых, она должна перемещать сообщения из входной очереди во внутреннее хранилище блока (что мы называем обработкой сообщения). А во-вторых, она должна также пытаться отсылать сообщения любым присоединенным мишеням (что мы называем распространением сообщений).

Например, на рис. 7 во входной очереди были сообщения, что вызвало планирование LWT к выполнению. После этого LWT обработала сообщение, сначала выполнив предоставленную пользовательскую функцию в transformer, проверила его строку по словарю, а потом переместила сообщение в буфер данного блока.

После переноса сообщения в буфер хранения LWT начинает операцию распространения, в ходе которой сообщения посылаются мишени — блоку format. В данном случае, поскольку сообщение str4 было в начале очереди transformer, оно посылается блоку format первым, потом передается следующее сообщение, str5. Тот же процесс повторяется в блоке format.

Обработка сообщений может быть разной в зависимости от типа блока сообщений. Так, в unbounded_buffer имеется простая операция обработки для перемещения сообщения в буфер хранения этого блока. Блок transformer обрабатывает сообщения, вызывая пользовательскую функцию перед перемещением сообщения в свой буфер. Другие блоки могут выполнять еще более сложные операции, например join, который может комбинировать несколько сообщений от разных источников и сохранять их в буфере для подготовки к последующему распространению сообщений.

Для большей эффективности в работе AAL ведет себя «интеллектуально» при создании LWT, чтобы для каждого блока сообщений одновременно планировалась лишь одна LWT. Если при выполнении LWT в очередь прибывают следующие сообщения, она продолжит извлекать сообщения и обрабатывать их. Таким образом, на рис. 6, если LWT блока transformer все еще работает, когда во входную очередь поступает сообщение str7, эта LWT извлечет и его, а затем обработает.То есть новая задача обработки и распространения не запускается.

Наличие у каждого блока сообщений своей LWT, выполняющей обработку и распространение, является ключевым элементом всей архитектуры AAL; это позволяет инфраструктуре передачи сообщений конвейеризовать работу в стиле потоков данных. Благодаря этому AAL может отделять блоки друг от друга и обеспечивать параллельное выполнение множества блоков. Каждая LWT должна просто посылать свои сообщения во входные очереди зарегистрированных блоков-мишеней, и каждая мишень будет всего-навсего планировать выполнение LWT для обработки содержимого в своей входной очереди. Применение единственной LWT для обработки и распространения сообщений гарантирует требуемое упорядочение сообщений для блоков.

Асинхронные агенты

Вторая основная часть AAL — асинхронные агенты. Это компоненты приложения, которые асинхронно взаимодействуют с более крупными вычислительными задачами и подсистемой ввода-вывода.При этом агенты могут взаимодействовать друг с другом и обеспечивают параллелизм на более низком уровне. Они изолированы, потому что их представление о мире целиком заключено в их классах, и они способны взаимодействовать с другими компонентами приложения, используя передачу сообщений. Сами агенты планируются к выполнению как задачи в Concurrency Runtime. Это позволяет им блокировать и разрешать совместное выполнение работы.

У асинхронного агента свой жизненный цикл, как показано на рис. 8. Этот жизненный цикл можно отслеживать и ожидать перехода в определенные состояния. Состояния, отмеченные зеленым – состояния выполнения, а состояния, отмеченные красным, — терминальные. Разработчики могут создавать собственные агенты, наследуя их от базового класса agent.

image: The Asynchronous Agent Lifecycle

Рисунок 8 Жизненный цикл асинхронного агента

Три функции базового класса — start, cancel и done — переводят агент между разными состояниями. После конструирования агенты находятся в состоянии created (создан). Запуск агента аналогичен запуску потока. Они ничего не исполняют, пока для них не вызывается метод start. В этот момент агент планируется к выполнению и переходит в состояние runnable (готов к выполнению).

Выбрав определенный агент, Concurrency Runtime переводит его в состояние started (запущен) и продолжает выполнять его, пока пользователь не вызовет метод done, указывая тем самым, что работа закончена. В промежуток, когда агент запланирован к выполнению, но еще не запущен, вызов cancel переведет агент в состояние canceled (отменен), и он никогда не будет выполняться.

Давайте вернемся к примеру с фильтрацией электронной почты, где конвейеризованные блоки сообщений вводили поток данных в приложении и увеличивали его возможности в параллельной обработке слов. Однако в этом примере не было показано, как обрабатывать ввод-вывод для работы с самой электронной почтой и разбивать ее сообщения на потоки строк для последующей обработки на конвейере. Кроме того, как только строки проходят конвейер, они должны быть собраны, чтобы их текст можно было перезаписывать после цензуры и форматирования. Вот здесь в игру и могут вступить агенты, чтобы повысить толерантность к различиям в задержках ввода-вывода.

Для примера рассмотрим конец нашего конвейера электронной почты. На этом этапе строки выведены блоком format, и их нужно записать в файлы в почтовом ящике. На рис. 9 демонстрируется, как агент вывода (output agent) может захватывать строки и создавать выходные сообщения электронной почты. Функция run агента WriterAgent принимает сообщения от блока format в цикле.

image: An Agent Capturing the Output of the Format Block

Рисунок 9 Агент, захватывающий вывод от блока format

Хотя большая часть обработки в этом приложении выполняется с использование потока данных, WriterAgent показывает, как в программу можно ввести и некую долю потока управления. Например, WriterAgent должен по-разному вести себя в зависимости от принимаемой входной строки, и, когда прибывает сообщение eof (end-of-file) (конец файла), он должен прекращать операцию. Код WriterAgent приведен на рис. 10.

Рисунок 10 WriterAgent

class WriterAgent : public agent {
public:
  WriterAgent(ISource<string> * src) : m_source(src) {
  }

  ~WriterAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;
    fopen_s( &stream, ... );

    string s;
    string eof("EOF");

    while (!feof(stream) && ((s=receive(m_source)) != eof)) {
      write_string(stream, s);
    }

    fclose(stream);
    done();
  }

private:

  ISource<string> * m_source;
};

В этом коде есть несколько интересных фрагментов, которые стоит обсудить. Во-первых, из деструктора вызывается статическая функция agent::wait. Ее можно вызвать с передачей указателя на любой агент, и она заблокируется, пока агент не перейдет в одно из конечных состояний: done или canceled. Хотя ожидание в деструкторе не обязательно для всех агентов, в большинстве случаев делать нужно именно так, поскольку это гарантирует, что агент на момент уничтожения не выполняет никакого кода.

Во-вторых, интерес представляет сам метод run. Он определяет основную работу, выполняемую агентом. В этом коде агент записывает строки, которые он считывает из источника (в нашем примере — из блока format).

Наконец, обратите внимание на последнюю строку метода run, где вызывается функция done агента. Вызов done переводит агент из состояния running (выполняется) в состояние done (завершен). В большинстве случаев эту функцию нужно вызывать в конце метода run. Однако в некоторых обстоятельствах приложениям может понадобиться использование агентов для установки определенных состояний, как, например, в сети потоков данных, а значит, агент должен оставаться активным и после завершения метода run.

Связываем все воедино

Теперь, когда мы создали конвейер передачи сообщений для фильтрации и форматирования строк, а также агент вывода для их обработки, мы можем добавить агент ввода (input agent), поведение которого очень схоже с таковым у агента вывода. Пример сборки воедино этого приложения приведен на рис. 11.

image: Agents Used to Process E-mail Messages

Рисунок 11 Агенты, используемые для обработки сообщений электронной почты

Одно из преимуществ использования агентов для обработки — возможность применения асинхронных акторов в приложении. Когда появляются данные, подлежащие обработке, агент ввода асинхронно запустит передачу строк через конвейер, а агент вывода может аналогичным образом считывать их и записывать в файлы. Эти акторы могут начинать и прекращать обработку совершенно независимо, и они полностью управляются данными. Такое поведение позволяет элегантно решать многие задачи, особенно при наличии задержек и асинхронном вводе-выводе, как в нашем примере с сообщениями электронной почты.

В данном примере я добавил второй агент, ReaderAgent, который действует аналогично WriterAgent с тем исключением, что обрабатывает ввод-вывод для чтения сообщений электронной почты и передачи строк в сеть. Код ReaderAgent показан на рис. 12.

Рисунок 12 ReaderAgent

class ReaderAgent : public agent {
public:
  ReaderAgent(ITarget<string> * target) : m_target(target) {
  }

  ~ReaderAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;       
    fopen_s( &stream, ...);

    while (!feof(stream)) {
      asend(m_target, read_word(stream));
    }

    fclose( stream );

    asend(m_target, string("eof"));
    done();
  }

private:

  ITarget<string> * m_target;
};

Располагая ReaderAgent и WriterAgent для асинхронной обработки ввода-вывода программы, мы должны лишь связать их с блоками transformer в сети, чтобы начать работу. Это несложно сделать после связывания воедино двух блоков:

censor.link_target(&format);

ReaderAgent r(&censor);
r.start();

WriterAgent w(&format);
w.start();

ReaderAgent создается со ссылкой на censor, чтобы он мог корректно отправлять сообщения этому блоку, а WriterAgent — со ссылкой на format, чтобы он мог извлекать сообщения. Каждый агент запускается своей функцией start, которая планирует агент к выполнению в рамках Concurrency Runtime. Так как каждый агент вызывает agent::wait(this) в своем деструкторе, выполнение не завершается, пока оба агента не перейдут в состояние done.

Заключение

В этой статье вы получили представление о некоторых новых возможностях в программировании на основе акторов и конвейеризации потоков данных, заложенных в Visual Studio 2010. Мы предлагаем опробовать эти возможности.

Если вы хотите копнуть поглубже, то помните, что в этой статье мы даже не затронули множество прочих возможностей: создание собственных блоков сообщений, фильтрацию сообщений и многое другое. В Parallel Computing Developer Center на сайте MSDN (msdn.microsoft.com/concurrency) вы найдете массу подробной информации и руководств по этой новой модели программирования и тому, как с ее помощью можно распараллеливать программы совершенно новыми способами.

Майкл Чу (Michael Chu) — инженер по разработке программного обеспечения в группе Concurrency Runtime, которая входит в группу Microsoft Parallel Computing Platform.

Кришнан Варадарайан (Krishnn Varadarajan) — инженер по разработке программного обеспечения в группе Concurrency Runtime, которая входит в группу Microsoft Parallel Computing Platform.

Выражаю благодарность за рецензирование статьи группе Concurrency Runtime