Procédure : Spécifier le degré de parallélisme dans un bloc de flux de données

Ce document décrit comment définir la propriété ExecutionDataflowBlockOptions.MaxDegreeOfParallelism pour permettre à un bloc de flux de données d'exécution de traiter plusieurs messages à la fois. Cela est utile quand vous avez un bloc de flux de données qui effectue un calcul de longue durée et qui peut tirer parti du traitement des messages en parallèle. Cet exemple utilise la classe System.Threading.Tasks.Dataflow.ActionBlock<TInput> pour effectuer plusieurs opérations de flux de données simultanément ; toutefois, vous pouvez spécifier le degré maximum de parallélisme dans les types de bloc d'exécution prédéfinis fournis par la bibliothèque de flux de données TPL, ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> et System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

Notes

La bibliothèque de flux de données TPL (espace de noms System.Threading.Tasks.Dataflow) n'est pas distribuée avec .NET. Pour installer l’espace de noms System.Threading.Tasks.Dataflow dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet, puis recherchez en ligne le package System.Threading.Tasks.Dataflow. Vous pouvez également l’installer à l’aide de l’interface CLI .NET Core en exécutant dotnet add package System.Threading.Tasks.Dataflow.

Exemple

L'exemple suivant effectue deux calculs de flux de données et affiche le temps écoulé nécessaire pour chaque calcul. Le premier calcul spécifie un degré maximum de parallélisme de 1, qui est la valeur par défaut. Un degré maximum de parallélisme égal à 1 amène le bloc de flux de données à traiter les messages en série. Le deuxième calcul ressemble à la première, à ceci près qu'il spécifie un degré maximum de parallélisme égal au nombre de processeurs disponibles. Cela permet au bloc de flux de données d'effectuer plusieurs opérations en parallèle.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to specify the maximum degree of parallelism
// when using dataflow.
class Program
{
   // Performs several computations by using dataflow and returns the elapsed
   // time required to perform the computations.
   static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
      int messageCount)
   {
      // Create an ActionBlock<int> that performs some work.
      var workerBlock = new ActionBlock<int>(
         // Simulate work by suspending the current thread.
         millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
         // Specify a maximum degree of parallelism.
         new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = maxDegreeOfParallelism
         });

      // Compute the time that it takes for several messages to
      // flow through the dataflow block.

      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      for (int i = 0; i < messageCount; i++)
      {
         workerBlock.Post(1000);
      }
      workerBlock.Complete();

      // Wait for all messages to propagate through the network.
      workerBlock.Completion.Wait();

      // Stop the timer and return the elapsed number of milliseconds.
      stopwatch.Stop();
      return stopwatch.Elapsed;
   }
   static void Main(string[] args)
   {
      int processorCount = Environment.ProcessorCount;
      int messageCount = processorCount;

      // Print the number of processors on this computer.
      Console.WriteLine("Processor count = {0}.", processorCount);

      TimeSpan elapsed;

      // Perform two dataflow computations and print the elapsed
      // time required for each.

      // This call specifies a maximum degree of parallelism of 1.
      // This causes the dataflow block to process messages serially.
      elapsed = TimeDataflowComputations(1, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);

      // Perform the computations again. This time, specify the number of
      // processors as the maximum degree of parallelism. This causes
      // multiple messages to be processed in parallel.
      elapsed = TimeDataflowComputations(processorCount, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", processorCount, messageCount, (int)elapsed.TotalMilliseconds);
   }
}

/* Sample output:
Processor count = 4.
Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
*/
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to specify the maximum degree of parallelism 
' when using dataflow.
Friend Class Program
    ' Performs several computations by using dataflow and returns the elapsed
    ' time required to perform the computations.
    Private Shared Function TimeDataflowComputations(ByVal maxDegreeOfParallelism As Integer, ByVal messageCount As Integer) As TimeSpan
        ' Create an ActionBlock<int> that performs some work.
        Dim workerBlock = New ActionBlock(Of Integer)(Function(millisecondsTimeout) Pause(millisecondsTimeout), New ExecutionDataflowBlockOptions() With {.MaxDegreeOfParallelism = maxDegreeOfParallelism})
        ' Simulate work by suspending the current thread.
        ' Specify a maximum degree of parallelism.

        ' Compute the time that it takes for several messages to 
        ' flow through the dataflow block.

        Dim stopwatch As New Stopwatch()
        stopwatch.Start()

        For i As Integer = 0 To messageCount - 1
            workerBlock.Post(1000)
        Next i
        workerBlock.Complete()

        ' Wait for all messages to propagate through the network.
        workerBlock.Completion.Wait()

        ' Stop the timer and return the elapsed number of milliseconds.
        stopwatch.Stop()
        Return stopwatch.Elapsed
    End Function

    Private Shared Function Pause(ByVal obj As Object)
        Thread.Sleep(obj)
        Return Nothing
    End Function
    Shared Sub Main(ByVal args() As String)
        Dim processorCount As Integer = Environment.ProcessorCount
        Dim messageCount As Integer = processorCount

        ' Print the number of processors on this computer.
        Console.WriteLine("Processor count = {0}.", processorCount)

        Dim elapsed As TimeSpan

        ' Perform two dataflow computations and print the elapsed
        ' time required for each.

        ' This call specifies a maximum degree of parallelism of 1.
        ' This causes the dataflow block to process messages serially.
        elapsed = TimeDataflowComputations(1, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", 1, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))

        ' Perform the computations again. This time, specify the number of 
        ' processors as the maximum degree of parallelism. This causes
        ' multiple messages to be processed in parallel.
        elapsed = TimeDataflowComputations(processorCount, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", processorCount, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))
    End Sub
End Class

' Sample output:
'Processor count = 4.
'Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
'Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
'

Programmation fiable

Par défaut, chaque bloc de flux de données prédéfini propage les messages dans l'ordre dans lequel ils sont reçus. Bien que plusieurs messages soient traités simultanément quand vous spécifiez un degré maximum de parallélisme supérieur 1, ils sont toujours propagés dans l'ordre dans lequel ils sont reçus.

Étant donné que la propriété MaxDegreeOfParallelism représente le degré maximal de parallélisme, le bloc de flux de données peut s'exécuter avec un degré de parallélisme moindre spécifié par vos soins. Le bloc de flux de données peut utiliser un degré de parallélisme moindre pour satisfaire ses exigences fonctionnelles ou pour prendre en compte un manque de ressources système disponibles. Un bloc de flux de données ne choisit jamais un degré de parallélisme supérieur à celui que vous spécifiez.

Voir aussi