How to: Use Arrays of Blocking Collections in a Pipeline

The following example shows how to use arrays of System.Collections.Concurrent.BlockingCollection<T> objects with static methods such as TryAddToAny and TryTakeFromAny to implement fast and flexible data transfer between components.

Example

The following example demonstrates a basic pipeline implementation in which each object is concurrently taking data from the input collection, transforming it, and passing it to the output collection.

Imports System
Imports System.Collections
Imports System.Collections.Concurrent
Imports System.Collections.Generic
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks
Namespace BlockingCollectionPipeline

    Class PipeLineDemo

        Public Shared Sub Main()

            Dim cts As CancellationTokenSource = New CancellationTokenSource()

            ' Start up a UI thread for cancellation.
            Task.Factory.StartNew(Sub()

                                      If (Console.ReadKey().KeyChar = "c"c) Then
                                          cts.Cancel()
                                      End If
                                  End Sub)
            'Generate some source data.
            Dim sourceArrays() As BlockingCollection(Of Integer)
            ReDim sourceArrays(5)
            For i As Integer = 0 To sourceArrays.Length - 1
                sourceArrays(i) = New BlockingCollection(Of Integer)(500)
            Next

            Parallel.For(0, sourceArrays.Length * 500, Sub(j)

                                                           Dim k = BlockingCollection(Of Integer).TryAddToAny(sourceArrays, j)
                                                           If (k >= 0) Then
                                                               Console.WriteLine("added {0} to source data", j)
                                                           End If
                                                       End Sub)

            For Each arr In sourceArrays
                arr.CompleteAdding()
            Next

            ' First filter accepts the ints, keeps back a small percentage
            ' as a processing fee, and converts the results to decimals.
            Dim filter1 = New PipelineFilter(Of Integer, Decimal)(
            sourceArrays,
            Function(n)
                Return Convert.ToDecimal(n * 0.97)
            End Function,
                        cts.Token,
                        "filter1"
             )
            ' Second filter accepts the decimals and converts them to 
            ' System.Strings.
            Dim filter2 = New PipelineFilter(Of Decimal, String)(
            filter1.m_output,
            Function(s) (String.Format("{0}", s)),
            cts.Token,
            "filter2"
             )
            ' Third filter uses the constructor with an Action<T>
            ' that renders its output to the screen, 
            ' not a blocking collection.
            Dim filter3 = New PipelineFilter(Of String, String)(
            filter2.m_output,
            Sub(s) Console.WriteLine("The final result is {0}", s),
            cts.Token,
            "filter3"
             )
            ' Start up the pipeline!
            Try

                Parallel.Invoke(
                             Sub() filter1.Run(),
                             Sub() filter2.Run(),
                             Sub() filter3.Run()
                         )

            Catch ae As AggregateException
                For Each ex In ae.InnerExceptions
                    Console.WriteLine(ex.Message + ex.StackTrace)
                Next

            End Try

            ' You will need to press twice if you ran to the end:
            ' once for the cancellation thread, and once for this thread.
            Console.WriteLine("Press any key.")
            Console.ReadKey()
        End Sub
    End Class
        class PipelineFilter(Of TInput, TOutput)

        Private m_processor As Func(Of TInput, TOutput) = Nothing
        Public m_input() As BlockingCollection(Of TInput) = Nothing
        Public m_output() As BlockingCollection(Of TOutput) = Nothing
        Private m_outputProcessor As Action(Of TInput) = Nothing
        Private m_token As CancellationToken
        Public Name As String
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
                ByVal processor As Func(Of TInput, TOutput),
                ByVal token As CancellationToken,
                ByVal _name As String)

            m_input = input
            '  m_output = New BlockingCollection(Of TOutput)()
            ReDim m_output(5)
            For i As Integer = 0 To m_output.Length - 1
                m_output(i) = New BlockingCollection(Of TOutput)(500)
                m_processor = processor
                m_token = token
                name = _name
            Next
        End Sub

        ' Use this constructor for the final endpoint, which does
        ' something like write to file or screen, instead of 
        ' pushing to another pipeline filter.
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
             ByVal renderer As Action(Of TInput),
           ByVal token As CancellationToken,
            ByVal _name As String)

            m_input = input
            m_outputProcessor = renderer
            m_token = token
            name = _name
        End Sub
        Public Sub Run()

            Console.WriteLine("{0} is running", Me.Name)
            While ((m_input.All(Function(bc) bc.IsCompleted) = False) And m_token.IsCancellationRequested = False)

                Dim receivedItem As TInput
                Dim i As Integer = BlockingCollection(Of TInput).TryTakeFromAny(
                        m_input, receivedItem, 50, m_token)
                If (i >= 0) Then

                    If (Not m_output Is Nothing) Then ' we pass data to another blocking collection

                        Dim outputItem As TOutput = m_processor(receivedItem)
                        BlockingCollection(Of TOutput).AddToAny(m_output, outputItem)
                        Console.WriteLine("{0} sent{1} to next", Me.Name, outputItem)

                    Else ' we're an endpoint

                        m_outputProcessor(receivedItem)
                    End If

                    else
                    Console.WriteLine("Unable to retrieve data from previous filter")
                End If
                        End While
            If (Not m_output Is Nothing) Then

                For Each bc In m_output
                    bc.CompleteAdding()
                Next

            End If
        End Sub
    End Class
End Namespace

namespace BlockingCollectionPipeline
{
    using System;
    using System.Collections;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;


    class PipeLineDemo
    {

        public static void Main()
        {
            CancellationTokenSource cts = new CancellationTokenSource();

            // Start up a UI thread for cancellation.
            Task.Factory.StartNew(() =>
                {
                    if(Console.ReadKey().KeyChar == 'c')
                        cts.Cancel();
                });

            //Generate some source data.
            BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5];
            for(int i = 0; i < sourceArrays.Length; i++)
                sourceArrays[i] = new BlockingCollection<int>(500);
            Parallel.For(0, sourceArrays.Length * 500, (j) =>
                                {
                                    int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
                                    if(k >=0)
                                        Console.WriteLine("added {0} to source data", j);
                                });

            foreach (var arr in sourceArrays)
                arr.CompleteAdding();

            // First filter accepts the ints, keeps back a small percentage
            // as a processing fee, and converts the results to decimals.
            var filter1 = new PipelineFilter<int, decimal>
            (
                sourceArrays,
                (n) => Convert.ToDecimal(n * 0.97),
                cts.Token,
                "filter1"
             );

            // Second filter accepts the decimals and converts them to 
            // System.Strings.
            var filter2 = new PipelineFilter<decimal, string>
            (
                filter1.m_output,
                (s) => String.Format("{0}", s),
                cts.Token,
                "filter2"
             );

            // Third filter uses the constructor with an Action<T>
            // that renders its output to the screen, 
            // not a blocking collection.
            var filter3 = new PipelineFilter<string, string>
            (
                filter2.m_output,
                (s) => Console.WriteLine("The final result is {0}", s),
                cts.Token,
                "filter3"
             );

             // Start up the pipeline!
            try
            {
                Parallel.Invoke(
                             () => filter1.Run(),
                             () => filter2.Run(),
                             () => filter3.Run()
                         );
            }
            catch (AggregateException ae)
                {
                foreach(var ex in ae.InnerExceptions)
                    Console.WriteLine(ex.Message + ex.StackTrace);
                }

            // You will need to press twice if you ran to the end:
            // once for the cancellation thread, and once for this thread.
            Console.WriteLine("Press any key.");
            Console.ReadKey();


        }

        class PipelineFilter<TInput, TOutput>
        {
            Func<TInput, TOutput> m_processor = null;
            public BlockingCollection<TInput>[] m_input;
            public BlockingCollection<TOutput>[] m_output = null;
            Action<TInput> m_outputProcessor = null;
            CancellationToken m_token;
            public string Name { get; private set; }

            public PipelineFilter(
                BlockingCollection<TInput>[] input,
                Func<TInput, TOutput> processor,
                CancellationToken token,
                string name)
            {
                m_input = input;
                m_output = new BlockingCollection<TOutput>[5];
                for (int i = 0; i < m_output.Length; i++)
                    m_output[i] = new BlockingCollection<TOutput>(500);

                m_processor = processor;
                m_token = token;
                Name = name;
            }

            // Use this constructor for the final endpoint, which does
            // something like write to file or screen, instead of 
            // pushing to another pipeline filter.
            public PipelineFilter(
                BlockingCollection<TInput>[] input,
                Action<TInput> renderer,
                CancellationToken token,
                string name)
            {
                m_input = input;
                m_outputProcessor = renderer;
                m_token = token;
                Name = name;
            }

            public void Run()
            {
                Console.WriteLine("{0} is running", this.Name);
                while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested)
                {
                    TInput receivedItem;
                    int i = BlockingCollection<TInput>.TryTakeFromAny(
                        m_input, out receivedItem, 50, m_token);
                    if ( i >= 0)
                    {
                        if (m_output != null) // we pass data to another blocking collection
                        {
                            TOutput outputItem = m_processor(receivedItem);
                            BlockingCollection<TOutput>.AddToAny(m_output, outputItem);
                            Console.WriteLine("{0} sent {1} to next", this.Name, outputItem);
                        }
                        else // we're an endpoint
                        {
                            m_outputProcessor(receivedItem);
                        }
                    }
                    else
                        Console.WriteLine("Unable to retrieve data from previous filter");
                }
                if (m_output != null)
                {
                    foreach (var bc in m_output) bc.CompleteAdding();
                }
            }
        }
    }    
}

See Also

Reference

System.Collections.Concurrent

Other Resources

Thread-Safe Collections