Procedura: scrivere e leggere messaggi da un blocco flusso di dati

Questo articolo descrive come usare la libreria di flussi di dati TPL (Task Parallel Library) per scrivere e leggere messaggi da un blocco di flussi di dati. La libreria del flusso di dati TPL fornisce sia metodi sincroni sia asincroni per la lettura e scrittura di messaggi da e in un blocco di flussi di dati. Questo articolo illustra come usare la classe System.Threading.Tasks.Dataflow.BufferBlock<T>. Mediante la classe BufferBlock<T> i messaggi vengono inseriti nel buffer; inoltre, essa viene utilizzata sia come origine sia come destinazione dei messaggi.

Nota

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.

Scrittura e lettura in modo sincrono

Nell'esempio seguente vengono utilizzati il metodo Post per scrivere in un blocco di flussi di dati BufferBlock<T> e il metodo Receive per leggere dallo stesso oggetto.

var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(bufferBlock.Receive());
}

// Output:
//   0
//   1
//   2
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

' Output:
'   0
'   1
'   2

È inoltre possibile utilizzare il metodo TryReceive per leggere da un blocco di flussi di dati, come mostrato nell'esempio riportato di seguito. Tramite il metodo TryReceive non viene bloccato il thread corrente ed esso risulta utile quando si esegue occasionalmente il polling dei dati.

// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
    Console.WriteLine(value);
}

// Output:
//   0
//   1
//   2
' Post more messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
    Console.WriteLine(value)
Loop

' Output:
'   0
'   1
'   2

Poiché il metodo Post viene utilizzato in modo sincrono, da parte dell'oggetto BufferBlock<T> negli esempi precedenti vengono ricevuti tutti i dati prima che vengono letti dal secondo ciclo. Nell'esempio seguente viene esteso il primo esempio utilizzando il metodo Task.WhenAll(Task[]) per leggere dal blocco di messaggi o per scrivere in quest'ultimo contemporaneamente. Poiché WhenAll attende tutte le operazioni asincrone in esecuzione contemporaneamente, i valori non vengono scritti nell'oggetto BufferBlock<T> in un ordine specifico.

// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});
var post2 = Task.Run(() =>
{
    bufferBlock.Post(2);
});

await Task.WhenAll(post01, receive, post2);

// Output:
//   0
//   1
//   2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
                          bufferBlock.Post(0)
                          bufferBlock.Post(1)
                      End Sub)
Dim receive = Task.Run(Sub()
                           For i As Integer = 0 To 2
                               Console.WriteLine(bufferBlock.Receive())
                           Next i
                       End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)

' Output:
'   0
'   1
'   2

Scrittura e lettura in modo asincrono

Nell'esempio seguente viene utilizzato il metodo SendAsync per scrivere in modo asincrono in un oggetto BufferBlock<T> e il metodo ReceiveAsync per leggere in modo asincrono dallo stesso oggetto. Nell'esempio vengono usati gli operatori async e await (Async e Await in Visual Basic) per inviare i dati al blocco di destinazione e per leggerli da quest'ultimo in modo asincrono. Il metodo SendAsync è utile quando è necessario consentire a un blocco di flussi di dati di posticipare i messaggi. Il metodo ReceiveAsync è utile se si desidera agire sui dati quando questi diventano disponibili. Per altre informazioni su come propagare i messaggi tra i relativi blocchi, vedere la sezione Passaggio di messaggi in Flusso di dati.

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
    await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(await bufferBlock.ReceiveAsync());
}

// Output:
//   0
//   1
//   2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
    await bufferBlock.SendAsync(i)
Next i

' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i

' Output:
'   0
'   1
'   2

Esempio completo

Nell'esempio seguente viene illustrato tutto il codice di questo articolo.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
    // Demonstrates asynchronous dataflow operations.
    static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
    {
        // Post more messages to the block asynchronously.
        for (int i = 0; i < 3; i++)
        {
            await bufferBlock.SendAsync(i);
        }

        // Asynchronously receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(await bufferBlock.ReceiveAsync());
        }

        // Output:
        //   0
        //   1
        //   2
    }

    static async Task Main()
    {
        var bufferBlock = new BufferBlock<int>();

        // Post several messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(bufferBlock.Receive());
        }

        // Output:
        //   0
        //   1
        //   2

        // Post more messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        while (bufferBlock.TryReceive(out int value))
        {
            Console.WriteLine(value);
        }

        // Output:
        //   0
        //   1
        //   2

        // Write to and read from the message block concurrently.
        var post01 = Task.Run(() =>
        {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
        });
        var receive = Task.Run(() =>
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(bufferBlock.Receive());
            }
        });
        var post2 = Task.Run(() =>
        {
            bufferBlock.Post(2);
        });

        await Task.WhenAll(post01, receive, post2);

        // Output:
        //   0
        //   1
        //   2

        // Demonstrate asynchronous dataflow operations.
        await AsyncSendReceive(bufferBlock);
    }
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
    ' Demonstrates asynchronous dataflow operations.
    Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
        ' Post more messages to the block asynchronously.
        For i As Integer = 0 To 2
            await bufferBlock.SendAsync(i)
        Next i

        ' Asynchronously receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(await bufferBlock.ReceiveAsync())
        Next i

        ' Output:
        '   0
        '   1
        '   2
    End Function

    Shared Sub Main(ByVal args() As String)
        Dim bufferBlock = New BufferBlock(Of Integer)()

        ' Post several messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
        Next i

        ' Output:
        '   0
        '   1
        '   2

        ' Post more messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        Dim value As Integer
        Do While bufferBlock.TryReceive(value)
            Console.WriteLine(value)
        Loop

        ' Output:
        '   0
        '   1
        '   2

        ' Write to and read from the message block concurrently.
        Dim post01 = Task.Run(Sub()
                                  bufferBlock.Post(0)
                                  bufferBlock.Post(1)
                              End Sub)
        Dim receive = Task.Run(Sub()
                                   For i As Integer = 0 To 2
                                       Console.WriteLine(bufferBlock.Receive())
                                   Next i
                               End Sub)
        Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
        Task.WaitAll(post01, receive, post2)

        ' Output:
        '   0
        '   1
        '   2

        ' Demonstrate asynchronous dataflow operations.
        AsyncSendReceive(bufferBlock).Wait()
    End Sub

End Class

Passaggi successivi

In questo esempio viene mostrato come leggere da un blocco di messaggi e come scrivere in quest'ultimo direttamente. È anche possibile connettere i blocchi di flussi di dati per creare pipeline, che sono sequenze lineari di blocchi di flussi di dati, o reti, che sono grafici di blocchi di flussi di dati. I dati di origini in una pipeline o in una rete vengono propagati nelle destinazioni in modo asincrono quando i dati in questione diventano disponibili. Per un esempio che crea una pipeline di base del flusso di dati, vedere Procedura dettagliata: creazione di una pipeline del flusso di dati. Per un esempio che crea una rete del flusso di dati più complessa, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Forms.

Vedi anche