How to: Use Arrays of Blocking Collections in a Pipeline
The following example shows how to use arrays of System.Collections.Concurrent.BlockingCollection<T> objects with static methods such as TryAddToAny and TryTakeFromAny to implement fast and flexible data transfer between components.
The following example demonstrates a basic pipeline implementation in which each object is concurrently taking data from the input collection, transforming it, and passing it to the output collection.
namespace BlockingCollectionPipeline { using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; class PipeLineDemo { public static void Main() { CancellationTokenSource cts = new CancellationTokenSource(); // Start up a UI thread for cancellation. Task.Factory.StartNew(() => { if(Console.ReadKey().KeyChar == 'c') cts.Cancel(); }); //Generate some source data. BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5]; for(int i = 0; i < sourceArrays.Length; i++) sourceArrays[i] = new BlockingCollection<int>(500); Parallel.For(0, sourceArrays.Length * 500, (j) => { int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j); if(k >=0) Console.WriteLine("added {0} to source data", j); }); foreach (var arr in sourceArrays) arr.CompleteAdding(); // First filter accepts the ints, keeps back a small percentage // as a processing fee, and converts the results to decimals. var filter1 = new PipelineFilter<int, decimal> ( sourceArrays, (n) => Convert.ToDecimal(n * 0.97), cts.Token, "filter1" ); // Second filter accepts the decimals and converts them to // System.Strings. var filter2 = new PipelineFilter<decimal, string> ( filter1.m_output, (s) => String.Format("{0}", s), cts.Token, "filter2" ); // Third filter uses the constructor with an Action<T> // that renders its output to the screen, // not a blocking collection. var filter3 = new PipelineFilter<string, string> ( filter2.m_output, (s) => Console.WriteLine("The final result is {0}", s), cts.Token, "filter3" ); // Start up the pipeline! try { Parallel.Invoke( () => filter1.Run(), () => filter2.Run(), () => filter3.Run() ); } catch (AggregateException ae) { foreach(var ex in ae.InnerExceptions) Console.WriteLine(ex.Message + ex.StackTrace); } // You will need to press twice if you ran to the end: // once for the cancellation thread, and once for this thread. Console.WriteLine("Press any key."); Console.ReadKey(); } class PipelineFilter<TInput, TOutput> { Func<TInput, TOutput> m_processor = null; public BlockingCollection<TInput>[] m_input; public BlockingCollection<TOutput>[] m_output = null; Action<TInput> m_outputProcessor = null; CancellationToken m_token; public string Name { get; private set; } public PipelineFilter( BlockingCollection<TInput>[] input, Func<TInput, TOutput> processor, CancellationToken token, string name) { m_input = input; m_output = new BlockingCollection<TOutput>[5]; for (int i = 0; i < m_output.Length; i++) m_output[i] = new BlockingCollection<TOutput>(500); m_processor = processor; m_token = token; Name = name; } // Use this constructor for the final endpoint, which does // something like write to file or screen, instead of // pushing to another pipeline filter. public PipelineFilter( BlockingCollection<TInput>[] input, Action<TInput> renderer, CancellationToken token, string name) { m_input = input; m_outputProcessor = renderer; m_token = token; Name = name; } public void Run() { Console.WriteLine("{0} is running", this.Name); while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested) { TInput receivedItem; int i = BlockingCollection<TInput>.TryTakeFromAny( m_input, out receivedItem, 50, m_token); if ( i >= 0) { if (m_output != null) // we pass data to another blocking collection { TOutput outputItem = m_processor(receivedItem); BlockingCollection<TOutput>.AddToAny(m_output, outputItem); Console.WriteLine("{0} sent {1} to next", this.Name, outputItem); } else // we're an endpoint { m_outputProcessor(receivedItem); } } else Console.WriteLine("Unable to retrieve data from previous filter"); } if (m_output != null) { foreach (var bc in m_output) bc.CompleteAdding(); } } } } }