How to: Use ForEach to Remove Items in a BlockingCollection

In addition to taking items from a BlockingCollection<T> by using the Take and TryTake method, you can also use a foreach (For Each in Visual Basic) to remove items until adding is completed and the collection is empty. This is called a mutating enumeration or consuming enumeration because, unlike a typical foreach (For Each) loop, this enumerator modifies the source collection by removing items.

The following example shows how to remove all the items in a BlockingCollection<T> by using a foreach (For Each) loop.

namespace EnumerateBlockingCollection
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;

    class Program
    {

        // Limit the collection size to 2000 items 
        // at any given time. Set itemsToProduce to >500 
        // to hit the limit. 
        const int upperLimit = 1000;

        // Adjust this number to see how it impacts 
        // the producing-consuming pattern. 
        const int itemsToProduce = 100;

        static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit);

        // Variables for diagnostic output only. 
        static Stopwatch sw = new Stopwatch();
        static int totalAdditions = 0;

        // Counter for synchronizing producers. 
        static int producersStillRunning = 2;            

        static void Main(string[] args)
        {

            // Start the stopwatch.
            sw.Start();



            // Queue the Producer threads. Store in an array 
            // for use with ContinueWhenAll
            Task[] producers = new Task[2];
            producers[0] = Task.Run(() => RunProducer("A", 0));
            producers[1] = Task.Run(() => RunProducer("B", itemsToProduce));

            // Create a cleanup task that will call CompleteAdding after 
            // all producers are done adding items.
            Task cleanup = Task.Factory.ContinueWhenAll(producers, (p) => collection.CompleteAdding());

            // Queue the Consumer thread. Put this call 
            // before Parallel.Invoke to begin consuming as soon as 
            // the producers add items.
            Task.Run(() => RunConsumer());

            // Keep the console window open while the 
            // consumer thread completes its output.
            Console.ReadKey();

        }

        static void RunProducer(string ID, int start)
        {

            int additions = 0;
            for (int i = start; i < start + itemsToProduce; i++)
            {
                // The data that is added to the collection. 
                long ticks = sw.ElapsedTicks;

                // Display additions and subtractions.
                Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i);

                if(!collection.IsAddingCompleted)
                    collection.Add(ticks);

                // Counter for demonstration purposes only.
                additions++;

                // Uncomment this line to  
                // slow down the producer threads     ing.
                Thread.SpinWait(100000); 
            }


            Interlocked.Add(ref totalAdditions, additions);
            Console.WriteLine("{0} is done adding: {1} items", ID, additions);
        }


        static void RunConsumer()
        {
            // GetConsumingEnumerable returns the enumerator for the  
            // underlying collection. 
            int subtractions = 0;
            foreach (var item in collection.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
                        item.ToString("D18"), subtractions++, collection.Count);
            }

            Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
                                totalAdditions, subtractions, collection.Count());
            sw.Stop();

            Console.WriteLine("Press any key to exit");
        }            
    }
}

This example uses a foreach loop with the BlockingCollection<T>.GetConsumingEnumerable method in the consuming thread, which causes each item to be removed from the collection as it is enumerated. System.Collections.Concurrent.BlockingCollection<T> limits the maximum number of items that are in the collection at any time. Enumerating the collection in this way blocks the consumer thread if no items are available or if the collection is empty. In this example blocking is not a concern because the producer thread adds items faster than they can be consumed.

There is no guarantee that the items are enumerated in the same order in which they are added by the producer threads.

To enumerate the collection without modifying it, just use foreach (For Each) without the GetConsumingEnumerable method. However, it is important to understand that this kind of enumeration represents a snapshot of the collection at a precise point in time. If other threads are adding or removing items concurrently while you are executing the loop, then the loop might not represent the actual state of the collection.

Was this page helpful?
(1500 characters remaining)
Thank you for your feedback
Show:
© 2014 Microsoft