Guide pratique pour implémenter un modèle de dataflow producteur-consommateur

Ce document décrit comment utiliser la bibliothèque de dataflow TPL pour implémenter un modèle producteur-consommateur. Dans ce modèle, le producteur envoie des messages à un bloc de message et le consommateur lit les messages de ce bloc.

Notes

La bibliothèque de flux de données TPL (espace de noms System.Threading.Tasks.Dataflow) n'est pas distribuée avec .NET. Pour installer l’espace de noms System.Threading.Tasks.Dataflow dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet, puis recherchez en ligne le package System.Threading.Tasks.Dataflow. Vous pouvez également l’installer à l’aide de l’interface CLI .NET Core en exécutant dotnet add package System.Threading.Tasks.Dataflow.

Exemple

L’exemple suivant montre un modèle producteur-consommateur de base qui utilise un dataflow. La méthode Produce écrit des tableaux contenant les octets aléatoires de données dans un objet System.Threading.Tasks.Dataflow.ITargetBlock<TInput>, et la méthode Consume lit les octets à partir d’un objet System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. En utilisant les interfaces ISourceBlock<TOutput> et ITargetBlock<TInput>, et non leurs types dérivés, vous pouvez écrire du code réutilisable sur une variété de types de blocs de flux de données. Cet exemple utilise la classe BufferBlock<T>. Dans la mesure où la classe BufferBlock<T> agit à la fois en tant que source et cible, le producteur et le consommateur peuvent utiliser un objet partagé pour transférer les données.

La méthode Produce appelle la méthode Post dans une boucle pour écrire de façon synchrone des données vers le bloc cible. Dès que la méthode Produce a écrit toutes les données dans le bloc cible, celle-ci appelle la méthode Complete pour indiquer que le bloc n’aura jamais aucune donnée supplémentaire disponible. La méthode Consume utilise les opérateurs async et await (Async et Await en Visual Basic) pour calculer de façon asynchrone le nombre total d’octets provenant de l’objet ISourceBlock<TOutput>. Pour agir de façon asynchrone, la méthode Consume appelle la méthode OutputAvailableAsync pour recevoir une notification lorsque le bloc source possède des données disponibles et lorsque le bloc source n’aura jamais aucune donnée supplémentaire disponible.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Programmation fiable

L’exemple précédent utilise un seul consommateur pour traiter les données sources. Si votre application dispose de plusieurs consommateurs, utilisez la méthode TryReceive pour lire des données à partir du bloc source, comme indiqué dans l’exemple suivant.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

La méthode TryReceive retourne False si donnée n’est disponible. Lorsque plusieurs consommateurs doivent accéder simultanément au bloc source, ce mécanisme garantit que les données sont toujours disponibles après l’appel à OutputAvailableAsync.

Voir aussi