Практическое руководство. Использование функции parallel_invoke для выполнения параллельных операций

Этот пример показывает, как использовать алгоритм Concurrency::parallel_invoke для улучшения производительности программы, выполняющей несколько операций с общим источником данных. Поскольку ни одна из операций не изменяет источник, они могут быть выполнены параллельно простым способом.

Пример

Рассмотрим следующий пример кода, который создает переменную типа MyDataType, вызывает функцию для инициализации этой переменной и выполняет с этими данными несколько продолжительных операций.

MyDataType data;
initialize_data(data);

lengthy_operation1(data);
lengthy_operation2(data);
lengthy_operation3(data);

Если функции lengthy_operation1, lengthy_operation2 и lengthy_operation3 не изменяют переменную MyDataType, их можно выполнять параллельно без дополнительных изменений.

Следующий пример отличается от предыдущего параллельным выполнением. Алгоритм parallel_invoke выполняет задачи параллельно и выполняет возврат после их завершения.

MyDataType data;
initialize_data(data);

Concurrency::parallel_invoke(
   [&data] { lengthy_operation1(data); },
   [&data] { lengthy_operation2(data); },
   [&data] { lengthy_operation3(data); }
);

В следующем примере с сайта gutenberg.org загружается Илиада Гомера, затем с файлом выполняются различные операции. Операции в примере сначала выполняются последовательно, а затем — параллельно.

// parallel-word-mining.cpp
// compile with: /EHsc /MD /DUNICODE /D_AFXDLL
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <ppl.h>
#include <string>
#include <iostream>
#include <vector>
#include <map>
#include <algorithm>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString get_http_file(CInternetSession& session, const CString& url);

// Adds each word in the provided string to the provided vector of strings.
void make_word_list(const wstring& text, vector<wstring>& words);

// Finds the most common words whose length are greater than or equal to the 
// provided minimum. 
vector<pair<wstring, size_t>> find_common_words(const vector<wstring>& words, 
   size_t min_length, size_t count);

// Finds the longest sequence of words that have the same first letter.
vector<wstring> find_longest_sequence(const vector<wstring>& words);

// Finds all pairs of palindromes that appear in the provided collection
// of words.
vector<pair<wstring, wstring>> find_palindromes(const vector<wstring>& words,
   size_t min_length);

int wmain()
{  
   // Manages the network connection.
   CInternetSession session(L"Microsoft Internet Browser");

   // Download 'The Iliad' from gutenberg.org.
   wcout << L"Downloading 'The Iliad'..." << endl;
   wstring file = get_http_file(session, L"http://www.gutenberg.org/files/6130/6130-0.txt");
   wcout << endl;

   // Convert the text to a list of individual words.
   vector<wstring> words;
   make_word_list(file, words);

   // Compare the time that it takes to perform several operations on the data
   // serially and in parallel.
   __int64 elapsed;

   vector<pair<wstring, size_t>> common_words;
   vector<wstring> longest_sequence;   
   vector<pair<wstring, wstring>> palindromes;

   wcout << L"Running serial version...";
   elapsed = time_call([&] {
      common_words = find_common_words(words, 5, 9);
      longest_sequence = find_longest_sequence(words);
      palindromes = find_palindromes(words, 5);
   });
   wcout << L" took " << elapsed << L" ms." << endl;

   wcout << L"Running parallel version...";
   elapsed = time_call([&] {
      parallel_invoke(         
         [&] { common_words = find_common_words(words, 5, 9); },
         [&] { longest_sequence = find_longest_sequence(words); },
         [&] { palindromes = find_palindromes(words, 5); }
      );
   });
   wcout << L" took " << elapsed << L" ms." << endl;
   wcout << endl;

   // Print results.

   wcout << L"The most common words that have five or more letters are:" 
         << endl;
   for_each(common_words.begin(), common_words.end(), 
      [](const pair<wstring, size_t>& p) {
         wcout << L"   " << p.first << L" (" << p.second << L")" << endl; 
      });

   wcout << L"The longest sequence of words that have the same first letter is:" 
         << endl << L"   ";
   for_each(longest_sequence.begin(), longest_sequence.end(), 
      [](const wstring& s) {
         wcout << s << L' '; 
      });
   wcout << endl;

   wcout << L"The following palindromes appear in the text:" << endl;
   for_each(palindromes.begin(), palindromes.end(), 
      [](const pair<wstring, wstring>& p) {
         wcout << L"   "  << p.first << L" " << p.second << endl;
      });
}

// Downloads the file at the given URL.
CString get_http_file(CInternetSession& session, const CString& url)
{
   CString result;

   // Reads data from an HTTP server.
   CHttpFile* http_file = NULL;

   try
   {
      // Open URL.
      http_file = reinterpret_cast<CHttpFile*>(session.OpenURL(url, 1));

      // Read the file.
      if(http_file != NULL)
      {           
         UINT bytes_read;
         do
         {
            char buffer[10000];
            bytes_read = http_file->Read(buffer, sizeof(buffer));
            result += buffer;
         }
         while (bytes_read > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete http_file;

   return result;
}

// Adds each word in the provided string to the provided vector of strings.
void make_word_list(const wstring& text, vector<wstring>& words)
{
   // Add continuous sequences of alphanumeric characters to the 
   // string vector. 
   wstring current_word;
   for_each(text.begin(), text.end(), [&](wchar_t ch) {
      if (!iswalnum(ch))
      {
         if (current_word.length() > 0)
         {
            words.push_back(current_word);
            current_word.clear();
         }
      }
      else
      {
         current_word += ch;
      }
   });
}

// Finds the most common words whose length are greater than or equal to the 
// provided minimum. 
vector<pair<wstring, size_t>> find_common_words(const vector<wstring>& words, 
   size_t min_length, size_t count)
{
   typedef pair<wstring, size_t> pair;

   // Counds the occurences of each word.
   map<wstring, size_t> counts;

   for_each(words.begin(), words.end(), [&](const wstring& word) {
      // Increment the count of words that are at least the minimum length.
      if (word.length() >= min_length)
      {
         auto find = counts.find(word);
         if (find != counts.end())
            find->second++;
         else
            counts.insert(make_pair(word, 1));
      }
   });

   // Copy the contents of the map to a vector and sort the vector by
   // the number of occurences of each word.
   vector<pair> wordvector;
   copy(counts.begin(), counts.end(), back_inserter(wordvector));

   sort(wordvector.begin(), wordvector.end(), [](const pair& x, const pair& y) {
      return x.second > y.second;
   });

   size_t size = min(wordvector.size(), count);
   wordvector.erase(wordvector.begin() + size, wordvector.end());

   return wordvector;
}

// Finds the longest sequence of words that have the same first letter.
vector<wstring> find_longest_sequence(const vector<wstring>& words)
{
   // The current sequence of words that have the same first letter.
   vector<wstring> candidate_list;
   // The longest sequence of words that have the same first letter.
   vector<wstring> longest_run;

   for_each(words.begin(), words.end(), [&](const wstring& word) {
      // Initialize the candidate list if it is empty.
      if (candidate_list.size() == 0)
      {
         candidate_list.push_back(word);
      }
      // Add the word to the candidate sequence if the first letter
      // of the word is the same as each word in the sequence.
      else if (word[0] == candidate_list[0][0])
      {
         candidate_list.push_back(word);
      }
      // The initial letter has changed; reset the candidate list.
      else 
      {
         // Update the longest sequence if needed.
         if (candidate_list.size() > longest_run.size())
            longest_run = candidate_list;

         candidate_list.clear();
         candidate_list.push_back(word);         
      }
   });

   return longest_run;
}

// Finds all pairs of palindromes that appear in the provided collection
// of words.
vector<pair<wstring, wstring>> find_palindromes(const vector<wstring>& words, 
   size_t min_length)
{
   typedef pair<wstring, wstring> pair;
   vector<pair> result;

   // Copy the words to a new vector object and sort that vector.
   vector<wstring> wordvector;
   copy(words.begin(), words.end(), back_inserter(wordvector));
   sort(wordvector.begin(), wordvector.end());

   // Add each word in the original collection to the result whose palindrome 
   // also exists in the collection. 
   for_each(words.begin(), words.end(), [&](const wstring& word) {
      if (word.length() >= min_length)
      {
         wstring rev = word;
         reverse(rev.begin(), rev.end());

         if (rev != word && binary_search(wordvector.begin(), wordvector.end(), rev))
         {
            auto candidate1 = make_pair(word, rev);
            auto candidate2 = make_pair(rev, word);
            if (find(result.begin(), result.end(), candidate1) == result.end() &&
                find(result.begin(), result.end(), candidate2) == result.end())
               result.push_back(candidate1);
         }
      }
   });

   return result;
}

В данном примере получается следующий результат.

Downloading 'The Iliad'...

Running serial version... took 953 ms.
Running parallel version... took 656 ms.

The most common words that have five or more letters are:
   their (953)
   shall (444)
   which (431)
   great (398)
   Hector (349)
   Achilles (309)
   through (301)
   these (268)
   chief (259)
The longest sequence of words that have the same first letter is:
   through the tempest to the tented
The following palindromes appear in the text:
   spots stops
   speed deeps
   keels sleek

В примере для вызова нескольких функций, работающих с одним источником данных, используется алгоритм parallel_invoke. Алгоритм parallel_invoke можно использовать для параллельного вызова любого набора функций, а не только работающих с одними данными.

Так как алгоритм parallel_invoke вызывает каждую функцию параллельно, его эффективность ограничена функцией, выполнение которой занимает самое большое время (если среда выполнения обрабатывает каждую функцию в отдельном процессоре). Если этот пример параллельно выполняет больше задач, чем число доступных процессоров, на каждом процессоре может выполняться несколько задач. В таком случае производительность ограничивается группой задач, выполнение которой занимает самое большое время.

Так как этот пример параллельно выполняет три задачи, не следует ожидать роста производительности на компьютерах с более чем тремя процессорами. Чтобы повысить производительность, можно разбить самые продолжительные задачи на меньшие задачи и выполнить их параллельно.

Алгоритм parallel_invoke можно использовать вместо классов Concurrency::task_group и Concurrency::structured_task_group, если не требуется поддержка отмены. Пример, в котором сравнивается использование алгоритма parallel_invoke и применение групп задач, см. в разделе Практическое руководство. Использование функции parallel_invoke для написания программы параллельной сортировки.

Компиляция кода

Чтобы скомпилировать код, скопируйте и вставьте его в проект Visual Studio или в файл с именем parallel-word-mining.cpp, затем выполните в окне командной строки Visual Studio следующую команду.

cl.exe /EHsc /MD /DUNICODE /D_AFXDLL parallel-word-mining.cpp

См. также

Ссылки

Функция parallel_invoke

Основные понятия

Параллельные алгоритмы