Comment : utiliser le surabonnement pour compenser la latence

Le surabonnement peut améliorer l'efficacité globale de certaines applications qui contiennent des tâches qui ont une quantité élevée de latence.Cette rubrique illustre comment utiliser le surabonnement afin de compenser la latence provoquée par la lecture de données à partir d'une connexion réseau.

Exemple

Cet exemple utilise la Bibliothèque d'agents asynchrones pour télécharger des fichiers à partir de serveurs HTTP.Le http_reader classe dérive de concurrency::agent et utilisations de message passant en mode asynchrone lire les noms d'URL à télécharger.

Le http_reader classe utilise le concurrency::task_group classe simultanément lire chaque fichier.Chaque tâche appelle la concurrency::Context::Oversubscribe méthode avec la _BeginOversubscription paramètre la valeur true pour activer l'over-subscription dans le contexte actuel.Chaque tâche utilise ensuite les classes MFC (Microsoft Foundation Classes) CInternetSession et CHttpFile pour télécharger le fichier.Pour finir, chaque tâche appelle Context::Oversubscribe avec le paramètre _BeginOversubscription défini à false pour désactiver le surabonnement.

Lorsque le surabonnement est activé, le runtime crée un thread supplémentaire dans lequel exécuter des tâches.Chacun de ces threads peut également surabonner le contexte actuel et ainsi créer des threads supplémentaires.Le http_reader classe utilise une concurrency::unbounded_buffer objet de limiter le nombre de threads que l'application utilise.L'agent initialise la mémoire tampon avec un nombre fixe de valeurs de jeton.Pour chaque opération de téléchargement, l'agent lit une valeur de jeton à partir de la mémoire tampon avant que l'opération démarre, puis réécrit cette valeur dans la mémoire tampon une fois l'opération terminée.Lorsque la mémoire tampon est vide, l'agent attend que l'une des opérations de téléchargement réécrive une valeur dans la mémoire tampon.

L'exemple suivant limite le nombre de tâches simultanées à deux fois le nombre de threads matériels disponibles.Cette valeur est un bon point de départ à utiliser en cas d'expérimentation avec le surabonnement.Vous pouvez utiliser une valeur adaptée à un environnement de traitement particulier ou modifier cette valeur de manière dynamique afin de répondre à la charge de travail réelle.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;

protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();

      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());

      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "https://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "https://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "https://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "https://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };

   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {

      // Start the agent.
      reader.start();

      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

Cet exemple produit la sortie suivante sur un ordinateur qui a quatre processeurs :

Downloading http://www.adatum.com/...
Downloading https://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading https://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading https://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading https://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

L'exemple peut s'exécuter plus rapidement lorsque le surabonnement est activé car des tâches supplémentaires s'exécutent pendant que d'autres tâches attendent qu'une opération latente se termine.

Compilation du code

Copiez l'exemple de code, collez-le dans un projet Visual Studio et collez-le dans un fichier nommé téléchargement oversubscription.cpp et puis exécution d'une des valeurs suivantes des commandes dans une fenêtre d'invite de commande Visual Studio.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp

cl.exe /EHsc /MT download-oversubscription.cpp

Programmation fiable

Désactivez toujours le surabonnement une fois que vous n'en avez plus besoin.Considérez une fonction qui ne gère pas une exception levée par une autre fonction.Si vous ne désactivez pas le surabonnement avant que la fonction ne retourne, tout travail parallèle supplémentaire surabonnera également le contexte actuel.

Vous pouvez utiliser le modèle RAII (Resource Acquisition Is Initialization) pour limiter le surabonnement à une portée donnée.Selon le modèle RAII, une structure de données est allouée sur la pile.Cette structure de données initialise ou acquiert une ressource lorsqu'elle est créée et détruit ou libère cette ressource lorsque la structure de données est détruite.Le modèle RAII garantit que le destructeur est appelé avant que la portée englobante ne quitte.Par conséquent, la ressource est gérée correctement lorsqu'une exception est levée ou lorsqu'une fonction contient plusieurs instructions return.

L'exemple suivant définit une structure nommée scoped_blocking_signal.Le constructeur de la structure scoped_blocking_signal active le surabonnement et le destructeur désactive le surabonnement.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);  
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

L'exemple suivant modifie le corps de la méthode download de façon à utiliser RAII pour s'assurer que le surabonnement est désactivé avant que la fonction ne soit retournée.Cette technique garantit que la méthode download est sécurisée du point de vue des exceptions.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

Voir aussi

Référence

Context::Oversubscribe, méthode

Concepts

Contextes