Export (0) Print
Expand All

OrderablePartitioner<TSource> Class

.NET Framework 4.6 and 4.5

Represents a particular manner of splitting an orderable data source into multiple partitions.

System.Object
  System.Collections.Concurrent.Partitioner<TSource>
    System.Collections.Concurrent.OrderablePartitioner<TSource>

Namespace:  System.Collections.Concurrent
Assemblies:   mscorlib (in mscorlib.dll)
  System.Collections.Concurrent (in System.Collections.Concurrent.dll)

[HostProtectionAttribute(SecurityAction.LinkDemand, Synchronization = true, 
	ExternalThreading = true)]
public abstract class OrderablePartitioner<TSource> : Partitioner<TSource>

Type Parameters

TSource

Type of the elements in the collection.

The OrderablePartitioner<TSource> type exposes the following members.

  NameDescription
Protected methodSupported by Portable Class LibraryOrderablePartitioner<TSource>Called from constructors in derived classes to initialize the OrderablePartitioner<TSource> class with the specified constraints on the index keys.
Top

  NameDescription
Public propertySupported by Portable Class LibraryKeysNormalizedGets whether order keys are normalized.
Public propertySupported by Portable Class LibraryKeysOrderedAcrossPartitionsGets whether elements in an earlier partition always come before elements in a later partition.
Public propertySupported by Portable Class LibraryKeysOrderedInEachPartitionGets whether elements in each partition are yielded in the order of increasing keys.
Public propertySupported by Portable Class LibrarySupportsDynamicPartitionsGets whether additional partitions can be created dynamically. (Inherited from Partitioner<TSource>.)
Top

  NameDescription
Public methodSupported by Portable Class LibraryEquals(Object)Determines whether the specified object is equal to the current object. (Inherited from Object.)
Protected methodSupported by Portable Class LibraryFinalizeAllows an object to try to free resources and perform other cleanup operations before it is reclaimed by garbage collection. (Inherited from Object.)
Public methodSupported by Portable Class LibraryGetDynamicPartitionsCreates an object that can partition the underlying collection into a variable number of partitions. (Overrides Partitioner<TSource>.GetDynamicPartitions().)
Public methodSupported by Portable Class LibraryGetHashCodeServes as the default hash function. (Inherited from Object.)
Public methodSupported by Portable Class LibraryGetOrderableDynamicPartitionsCreates an object that can partition the underlying collection into a variable number of partitions.
Public methodSupported by Portable Class LibraryGetOrderablePartitionsPartitions the underlying collection into the specified number of orderable partitions.
Public methodSupported by Portable Class LibraryGetPartitionsPartitions the underlying collection into the given number of ordered partitions. (Overrides Partitioner<TSource>.GetPartitions(Int32).)
Public methodSupported by Portable Class LibraryGetTypeGets the Type of the current instance. (Inherited from Object.)
Protected methodSupported by Portable Class LibraryMemberwiseCloneCreates a shallow copy of the current Object. (Inherited from Object.)
Public methodSupported by Portable Class LibraryToStringReturns a string that represents the current object. (Inherited from Object.)
Top

  NameDescription
Public Extension MethodSupported by Portable Class LibraryAsParallel<TSource>()Overloaded. Enables parallelization of a query, as sourced by a custom partitioner that is responsible for splitting the input sequence into partitions. (Defined by ParallelEnumerable.)
Public Extension MethodSupported by Portable Class LibraryAsParallel<TSource>()Overloaded. (Defined by ParallelEnumerable.)
Top

The implementation of the derived class is responsible for ordering the elements into key-value pairs in whatever manner is appropriate. For more information, see Custom Partitioners for PLINQ and TPL.

NoteNote

The HostProtectionAttribute attribute applied to this type or member has the following Resources property value: Synchronization | ExternalThreading. The HostProtectionAttribute does not affect desktop applications (which are typically started by double-clicking an icon, typing a command, or entering a URL in a browser). For more information, see the HostProtectionAttribute class or SQL Server Programming and Host Protection Attributes.

The following example shows how to implement an orderable partitioner that returns one element at a time:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace OrderablePartitionerDemo
{


    // Simple partitioner that will extract one (index,item) pair at a time,  
    // in a thread-safe fashion, from the underlying collection. 
    class SingleElementOrderablePartitioner<T> : OrderablePartitioner<T>
    {
        // The collection being wrapped by this Partitioner
        IEnumerable<T> m_referenceEnumerable;

        // Class used to wrap m_index for the purpose of sharing access to it 
        // between an InternalEnumerable and multiple InternalEnumerators 
        private class Shared<U>
        {
            internal U Value;

            public Shared(U item)
            {
                Value = item;
            }
        }

        // Internal class that serves as a shared enumerable for the 
        // underlying collection. 
        private class InternalEnumerable : IEnumerable<KeyValuePair<long, T>>, IDisposable
        {
            IEnumerator<T> m_reader;
            bool m_disposed = false;
            Shared<long> m_index = null;

            // These two are used to implement Dispose() when static partitioning is being performed 
            int m_activeEnumerators;
            bool m_downcountEnumerators;

            // "downcountEnumerators" will be true for static partitioning, false for 
            // dynamic partitioning.   
            public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
            {
                m_reader = reader;
                m_index = new Shared<long>(0);
                m_activeEnumerators = 0;
                m_downcountEnumerators = downcountEnumerators;
            }

            public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
            {
                if (m_disposed)
                    throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");

                // For static partitioning, keep track of the number of active enumerators. 
                if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);

                return new InternalEnumerator(m_reader, this, m_index);
            }

            IEnumerator IEnumerable.GetEnumerator()
            {
                return ((IEnumerable<KeyValuePair<long, T>>)this).GetEnumerator();
            }

            public void Dispose()
            {
                if (!m_disposed)
                {
                    // Only dispose the source enumerator if you are doing dynamic partitioning 
                    if (!m_downcountEnumerators)
                    {
                        m_reader.Dispose();
                    }
                    m_disposed = true;
                }
            }

            // Called from Dispose() method of spawned InternalEnumerator.  During 
            // static partitioning, the source enumerator will be automatically 
            // disposed once all requested InternalEnumerators have been disposed. 
            public void DisposeEnumerator()
            {
                if (m_downcountEnumerators)
                {
                    if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
                    {
                        m_reader.Dispose();
                    }
                }
            }
        }

        // Internal class that serves as a shared enumerator for  
        // the underlying collection. 
        private class InternalEnumerator : IEnumerator<KeyValuePair<long, T>>
        {
            KeyValuePair<long, T> m_current;
            IEnumerator<T> m_source;
            InternalEnumerable m_controllingEnumerable;
            Shared<long> m_index = null;
            bool m_disposed = false;


            public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable, Shared<long> index)
            {
                m_source = source;
                m_current = default(KeyValuePair<long, T>);
                m_controllingEnumerable = controllingEnumerable;
                m_index = index;
            }

            object IEnumerator.Current
            {
                get { return m_current; }
            }

            KeyValuePair<long, T> IEnumerator<KeyValuePair<long, T>>.Current
            {
                get { return m_current; }
            }

            void IEnumerator.Reset()
            {
                throw new NotSupportedException("Reset() not supported");
            }

            // This method is the crux of this class.  Under lock, it calls 
            // MoveNext() on the underlying enumerator, grabs Current and index,  
            // and increments the index. 
            bool IEnumerator.MoveNext()
            {
                bool rval = false;
                lock (m_source)
                {
                    rval = m_source.MoveNext();
                    if (rval)
                    {
                        m_current = new KeyValuePair<long, T>(m_index.Value, m_source.Current);
                        m_index.Value = m_index.Value + 1;
                    }
                    else m_current = default(KeyValuePair<long, T>);
                }
                return rval;
            }

            void IDisposable.Dispose()
            {
                if (!m_disposed)
                {
                    // Delegate to parent enumerable's DisposeEnumerator() method
                    m_controllingEnumerable.DisposeEnumerator();
                    m_disposed = true;
                }
            }

        }

        // Constructor just grabs the collection to wrap 
        public SingleElementOrderablePartitioner(IEnumerable<T> enumerable)
            : base(true, true, true)
        {
            // Verify that the source IEnumerable is not null 
            if (enumerable == null)
                throw new ArgumentNullException("enumerable");

            m_referenceEnumerable = enumerable;
        }

        // Produces a list of "numPartitions" IEnumerators that can each be
        // used to traverse the underlying collection in a thread-safe manner. 
        // This will return a static number of enumerators, as opposed to 
        // GetOrderableDynamicPartitions(), the result of which can be used to produce 
        // any number of enumerators. 
        public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int numPartitions)
        {
            if (numPartitions < 1)
                throw new ArgumentOutOfRangeException("NumPartitions");

            List<IEnumerator<KeyValuePair<long, T>>> list = new List<IEnumerator<KeyValuePair<long, T>>>(numPartitions);

            // Since we are doing static partitioning, create an InternalEnumerable with reference 
            // counting of spawned InternalEnumerators turned on.  Once all of the spawned enumerators 
            // are disposed, dynamicPartitions will be disposed. 
            var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
            for (int i = 0; i < numPartitions; i++)
                list.Add(dynamicPartitions.GetEnumerator());

            return list;
        }

        // Returns an instance of our internal Enumerable class.  GetEnumerator() 
        // can then be called on that (multiple times) to produce shared enumerators. 
        public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
        {
            // Since we are doing dynamic partitioning, create an InternalEnumerable with reference 
            // counting of spawned InternalEnumerators turned off.  This returned InternalEnumerable 
            // will need to be explicitly disposed. 
            return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
        }

        // Must be set to true if GetDynamicPartitions() is supported. 
        public override bool SupportsDynamicPartitions
        {
            get { return true; }
        }
    }

    class Program
    {
        static void Main()
        {
            // 
            // First a fairly simple visual test 
            // 
            var someCollection = new string[] { "four", "score", "and", "twenty", "years", "ago" };
            var someOrderablePartitioner = new SingleElementOrderablePartitioner<string>(someCollection);
            Parallel.ForEach(someOrderablePartitioner, (item, state, index) =>
            {
                Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId);
            });

            // 
            // Now a test of static partitioning, using 2 partitions and 2 tasks 
            // 
            var staticPartitioner = someOrderablePartitioner.GetOrderablePartitions(2);

            // staticAction will consume the shared enumerable 
            int partitionerListIndex = 0;
            Action staticAction = () =>
            {
                int myIndex = Interlocked.Increment(ref partitionerListIndex) - 1;
                var enumerator = staticPartitioner[myIndex];
                while (enumerator.MoveNext())
                    Console.WriteLine("Static partitioning: item = {0}, index = {1}, thread id = {2}",
                        enumerator.Current.Value, enumerator.Current.Key, Thread.CurrentThread.ManagedThreadId);
                enumerator.Dispose();
            };

            // Now launch two of them
            Parallel.Invoke(staticAction, staticAction);

            // 
            // Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach) 
            //
            Console.WriteLine("OrderablePartitioner test: testing for index mismatches");
            List<int> src = Enumerable.Range(0, 100000).ToList();
            SingleElementOrderablePartitioner<int> myOP = new SingleElementOrderablePartitioner<int>(src);

            int counter = 0;
            bool mismatch = false;
            Parallel.ForEach(myOP, (item, state, index) =>
            {
                if (item != index) mismatch = true;
                Interlocked.Increment(ref counter);
            });

            if (mismatch) Console.WriteLine("OrderablePartitioner Test: index mismatch detected");

            Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter);


        }
    }


}

.NET Framework

Supported in: 4.6, 4.5, 4

.NET Framework Client Profile

Supported in: 4

Portable Class Library

Supported in: Portable Class Library

Supported in: Windows Phone 8.1

All public members of OrderablePartitioner<TSource> are thread-safe and may be called from multiple threads concurrently.

Show:
© 2015 Microsoft