Partitioner<TSource> Class
Represents a particular manner of splitting a data source into multiple partitions.
Assembly: mscorlib (in mscorlib.dll)
System.Collections.Concurrent.Partitioner<TSource>
System.Collections.Concurrent.OrderablePartitioner<TSource>
| Name | Description | |
|---|---|---|
![]() | Partitioner<TSource>() | Creates a new partitioner instance. |
| Name | Description | |
|---|---|---|
![]() | SupportsDynamicPartitions | Gets whether additional partitions can be created dynamically. |
| Name | Description | |
|---|---|---|
![]() | Equals(Object) | Determines whether the specified object is equal to the current object.(Inherited from Object.) |
![]() | Finalize() | Allows an object to try to free resources and perform other cleanup operations before it is reclaimed by garbage collection.(Inherited from Object.) |
![]() | GetDynamicPartitions() | Creates an object that can partition the underlying collection into a variable number of partitions. |
![]() | GetHashCode() | Serves as the default hash function. (Inherited from Object.) |
![]() | GetPartitions(Int32) | Partitions the underlying collection into the given number of partitions. |
![]() | GetType() | |
![]() | MemberwiseClone() | |
![]() | ToString() | Returns a string that represents the current object.(Inherited from Object.) |
| Name | Description | |
|---|---|---|
![]() | AsParallel<TSource>() | Overloaded. Enables parallelization of a query, as sourced by a custom partitioner that is responsible for splitting the input sequence into partitions.(Defined by ParallelEnumerable.) |
For more information, see Custom Partitioners for PLINQ and TPL.
The following example shows how to implement a partitioner that returns a single element at a time:
using System; using System.Collections; using System.Collections.Generic; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace PartitionerDemo { // Simple partitioner that will extract one item at a time, in a thread-safe fashion, // from the underlying collection. class SingleElementPartitioner<T> : Partitioner<T> { // The collection being wrapped by this Partitioner IEnumerable<T> m_referenceEnumerable; // Internal class that serves as a shared enumerable for the // underlying collection. private class InternalEnumerable : IEnumerable<T>, IDisposable { IEnumerator<T> m_reader; bool m_disposed = false; // These two are used to implement Dispose() when static partitioning is being performed int m_activeEnumerators; bool m_downcountEnumerators; // "downcountEnumerators" will be true for static partitioning, false for // dynamic partitioning. public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators) { m_reader = reader; m_activeEnumerators = 0; m_downcountEnumerators = downcountEnumerators; } public IEnumerator<T> GetEnumerator() { if (m_disposed) throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing"); // For static partitioning, keep track of the number of active enumerators. if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators); return new InternalEnumerator(m_reader, this); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } public void Dispose() { if (!m_disposed) { // Only dispose the source enumerator if you are doing dynamic partitioning if (!m_downcountEnumerators) { m_reader.Dispose(); } m_disposed = true; } } // Called from Dispose() method of spawned InternalEnumerator. During // static partitioning, the source enumerator will be automatically // disposed once all requested InternalEnumerators have been disposed. public void DisposeEnumerator() { if (m_downcountEnumerators) { if (Interlocked.Decrement(ref m_activeEnumerators) == 0) { m_reader.Dispose(); } } } } // Internal class that serves as a shared enumerator for // the underlying collection. private class InternalEnumerator : IEnumerator<T> { T m_current; IEnumerator<T> m_source; InternalEnumerable m_controllingEnumerable; bool m_disposed = false; public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable) { m_source = source; m_current = default(T); m_controllingEnumerable = controllingEnumerable; } object IEnumerator.Current { get { return m_current; } } T IEnumerator<T>.Current { get { return m_current; } } void IEnumerator.Reset() { throw new NotSupportedException("Reset() not supported"); } // This method is the crux of this class. Under lock, it calls // MoveNext() on the underlying enumerator and grabs Current. bool IEnumerator.MoveNext() { bool rval = false; lock (m_source) { rval = m_source.MoveNext(); m_current = rval ? m_source.Current : default(T); } return rval; } void IDisposable.Dispose() { if (!m_disposed) { // Delegate to parent enumerable's DisposeEnumerator() method m_controllingEnumerable.DisposeEnumerator(); m_disposed = true; } } } // Constructor just grabs the collection to wrap public SingleElementPartitioner(IEnumerable<T> enumerable) { // Verify that the source IEnumerable is not null if (enumerable == null) throw new ArgumentNullException("enumerable"); m_referenceEnumerable = enumerable; } // Produces a list of "numPartitions" IEnumerators that can each be // used to traverse the underlying collection in a thread-safe manner. // This will return a static number of enumerators, as opposed to // GetDynamicPartitions(), the result of which can be used to produce // any number of enumerators. public override IList<IEnumerator<T>> GetPartitions(int numPartitions) { if (numPartitions < 1) throw new ArgumentOutOfRangeException("NumPartitions"); List<IEnumerator<T>> list = new List<IEnumerator<T>>(numPartitions); // Since we are doing static partitioning, create an InternalEnumerable with reference // counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators // are disposed, dynamicPartitions will be disposed. var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true); for (int i = 0; i < numPartitions; i++) list.Add(dynamicPartitions.GetEnumerator()); return list; } // Returns an instance of our internal Enumerable class. GetEnumerator() // can then be called on that (multiple times) to produce shared enumerators. public override IEnumerable<T> GetDynamicPartitions() { // Since we are doing dynamic partitioning, create an InternalEnumerable with reference // counting of spawned InternalEnumerators turned off. This returned InternalEnumerable // will need to be explicitly disposed. return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false); } // Must be set to true if GetDynamicPartitions() is supported. public override bool SupportsDynamicPartitions { get { return true; } } } class Program { // Test our SingleElementPartitioner(T) class static void Main() { // Our sample collection string[] collection = new string[] {"red", "orange", "yellow", "green", "blue", "indigo", "violet", "black", "white", "grey"}; // Instantiate a partitioner for our collection SingleElementPartitioner<string> myPart = new SingleElementPartitioner<string>(collection); // // Simple test with ForEach // Console.WriteLine("Testing with Parallel.ForEach"); Parallel.ForEach(myPart, item => { Console.WriteLine(" item = {0}, thread id = {1}", item, Thread.CurrentThread.ManagedThreadId); }); // // // Demonstrate the use of static partitioning, which really means // "using a static number of partitioners". The partitioners themselves // may still be "dynamic" in the sense that their outputs may not be // deterministic. // // // Perform static partitioning of collection var staticPartitions = myPart.GetPartitions(2); int index = 0; Console.WriteLine("Static Partitioning, 2 partitions, 2 tasks:"); // Action will consume from static partitions Action staticAction = () => { int myIndex = Interlocked.Increment(ref index) - 1; // compute your index var myItems = staticPartitions[myIndex]; // grab your static partition int id = Thread.CurrentThread.ManagedThreadId; // cache your thread id // Enumerate through your static partition while (myItems.MoveNext()) { Thread.Sleep(50); // guarantees that multiple threads have a chance to run Console.WriteLine(" item = {0}, thread id = {1}", myItems.Current, Thread.CurrentThread.ManagedThreadId); } myItems.Dispose(); }; // Spawn off 2 actions to consume 2 static partitions Parallel.Invoke(staticAction, staticAction); // // // Demonstrate the use of dynamic partitioning // // // Grab an IEnumerable which can then be used to generate multiple // shared IEnumerables. var dynamicPartitions = myPart.GetDynamicPartitions(); Console.WriteLine("Dynamic Partitioning, 3 tasks:"); // Action will consume from dynamic partitions Action dynamicAction = () => { // Grab an enumerator from the dynamic partitions var enumerator = dynamicPartitions.GetEnumerator(); int id = Thread.CurrentThread.ManagedThreadId; // cache our thread id // Enumerate through your dynamic enumerator while (enumerator.MoveNext()) { Thread.Sleep(50); // guarantees that multiple threads will have a chance to run Console.WriteLine(" item = {0}, thread id = {1}", enumerator.Current, id); } enumerator.Dispose(); }; // Spawn 3 concurrent actions to consume the dynamic partitions Parallel.Invoke(dynamicAction, dynamicAction, dynamicAction); // Clean up if (dynamicPartitions is IDisposable) ((IDisposable)dynamicPartitions).Dispose(); } } }
Available since 8
.NET Framework
Available since 4.0
Portable Class Library
Supported in: portable .NET platforms
Windows Phone
Available since 8.1
The static methods on Partitioner<TSource> are all thread-safe and may be used concurrently from multiple threads. However, while a created partitioner is in use, the underlying data source should not be modified, whether from the same thread that is using a partitioner or from a separate thread.


