Procedimiento: Implementación de un modelo de flujo de datos productor-consumidor

En este artículo, aprenderá a utilizar la biblioteca de flujos de datos TPL para implementar un modelo productor-consumidor. En este modelo, el productor envía mensajes a un bloque de mensajes y el consumidor lee los mensajes de este bloque.

Nota

La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.

Ejemplo

En el ejemplo siguiente se muestra un modelo productor-consumidor básico que usa el flujo de datos. El método Produce escribe matrices que contienen bytes de datos aleatorios en un objeto System.Threading.Tasks.Dataflow.ITargetBlock<TInput> y el método Consume lee los bytes de un objeto System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Al actuar en las interfaces ISourceBlock<TOutput> y ITargetBlock<TInput>, en lugar de en sus tipos derivados, puede escribir código reutilizable que puede actuar en una variedad de tipos de bloques de flujo de datos. Este ejemplo utiliza la clase BufferBlock<T>. Puesto que la clase BufferBlock<T> actúa como origen y como un bloque de origen y destino, el productor y el consumidor pueden utilizar un objeto compartido para transferir datos.

El método Produce llama al método Post en un bucle para escribir datos de forma sincrónica en el bloque de destino. Después de que el método Produce escriba todos los datos en el bloque de destino, llama al método Complete para indicar que el bloque nunca tendrá datos adicionales disponibles. El método Consume usa los operadores async y await (Async y Await en Visual Basic) para calcular de forma asincrónica el número total de bytes recibidos del objeto ISourceBlock<TOutput>. Para que actúe de forma asincrónica, el método Consume llama al método OutputAvailableAsync para recibir una notificación cuando el bloque de origen tiene datos disponibles y cuando el bloque de origen nunca va a tener datos adicionales disponibles.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Programación sólida

En el ejemplo anterior se usa un solo consumidor para procesar los datos de origen. Si tiene varios consumidores en la aplicación, use el método TryReceive para leer datos desde el bloque de origen, como se muestra en el siguiente ejemplo.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

El método TryReceive devuelve False cuando no hay datos disponibles. Cuando varios consumidores deben tener acceso simultáneamente al bloque de origen, este mecanismo garantiza que los datos están disponibles después de la llamada a OutputAvailableAsync.

Vea también