Procedura: implementare un modello di flusso di dati producer-consumer

In questo articolo viene descritto come usare la libreria del flusso di dati TPL per implementare un modello producer-consumer. In questo modello, il producer invia messaggi a un blocco di messaggi e il consumer legge i messaggi dal blocco.

Nota

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.

Esempio

Nell'esempio seguente viene illustrato un modello di base producer-consumer in cui viene utilizzato il flusso di dati. Tramite il metodo Produce vengono scritte matrici contenenti byte casuali di dati in un oggetto System.Threading.Tasks.Dataflow.ITargetBlock<TInput> e tramite il metodo Consume vengono letti i byte da un oggetto System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Agendo sulle interfacce ISourceBlock<TOutput> e ITargetBlock<TInput>, anziché sui relativi tipi derivati, è possibile scrivere codice riutilizzabile che può agire su diversi tipi di blocchi di flussi di dati. Nell'esempio viene utilizzata la classe BufferBlock<T>. Poiché la classe BufferBlock<T> viene utilizzata sia come blocco di origine sia come blocco di destinazione, il producer e il consumer possono utilizzare un oggetto condiviso per il trasferimento dei dati.

Tramite il metodo Produce viene chiamato il metodo Post in un ciclo per scrivere i dati in modo sincrono nel blocco di destinazione. Dopo che tramite il metodo Produce vengono scritti tutti i dati nel blocco di destinazione, viene chiamato il metodo Complete per indicare che nel blocco non saranno mai presenti dati aggiuntivi disponibili. Il metodo Consume usa gli operatori async e await (Async e Await in Visual Basic) per calcolare in modo asincrono il numero totale di byte ricevuti dall'oggetto ISourceBlock<TOutput>. Per agire in modo asincrono, tramite il metodo Consume viene chiamato il metodo OutputAvailableAsync per ricevere una notifica quando nel blocco di origine vi sono dati disponibili e quando non vi saranno mai dati aggiuntivi disponibili.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Programmazione efficiente

L'esempio precedente usa solo un consumer per elaborare i dati di origine. Se si dispone di più consumer nell'applicazione, utilizzare il metodo TryReceive per leggere i dati dal blocco di origine, come illustrato nell'esempio riportato di seguito.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Tramite il metodo TryReceive viene restituito False quando non vi sono dati disponibili. Quando più consumer devono accedere al blocco di origine contemporaneamente, questo meccanismo garantisce che i dati sono sempre disponibili dopo la chiamata a OutputAvailableAsync.

Vedi anche