如何:停止或中断 Parallel.For 循环

下面的示例演示如何从 For 循环中断(在 Visual Basic 中为 Exit),以及如何停止循环。 在此上下文中,“中断”表示完成当前线程上当前迭代之前的所有线程上的所有迭代,然后退出循环。“ 停止”表示在方便的情况下尽快停止所有迭代。

示例

本示例演示 For 循环;但可以按同一方式停止或中断 ForEach 循环。 在 ForEach 循环中,以内部方式为每个分区中的每个元素生成一个迭代索引。

' How to: Stop or Break from a Parallel.For Loop
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks

Module ParallelForStop
    Sub Main()
        StopLoop()
        BreakAtThreshold()

        Console.WriteLine("Press any key to exit.")
        Console.ReadKey()
    End Sub

    Sub StopLoop()
        Console.WriteLine("Stop loop...")
        Dim source As Double() = MakeDemoSource(1000, 1)
        Dim results As New ConcurrentStack(Of Double)()

        ' i is the iteration variable. loopState is a 
        ' compiler-generated ParallelLoopState
        Parallel.For(0, source.Length, Sub(i, loopState)
                                           ' Take the first 100 values that are retrieved
                                           ' from anywhere in the source.
                                           If i < 100 Then
                                               ' Accessing shared object on each iteration
                                               ' is not efficient. See remarks.
                                               Dim d As Double = Compute(source(i))
                                               results.Push(d)
                                           Else
                                               loopState.[Stop]()
                                               Exit Sub

                                           End If
                                           ' Close lambda expression.
                                       End Sub)
        ' Close Parallel.For
        Console.WriteLine("Results contains {0} elements", results.Count())
    End Sub


    Sub BreakAtThreshold()
        Dim source As Double() = MakeDemoSource(10000, 1.0002)
        Dim results As New ConcurrentStack(Of Double)()

        ' Store all values below a specified threshold.
        Parallel.For(0, source.Length, Function(i, loopState)
                                           Dim d As Double = Compute(source(i))
                                           results.Push(d)
                                           If d > 0.2 Then
                                               ' Might be called more than once!
                                               loopState.Break()
                                               Console.WriteLine("Break called at iteration {0}. d = {1} ", i, d)
                                               Thread.Sleep(1000)
                                           End If
                                           Return d
                                       End Function)

        Console.WriteLine("results contains {0} elements", results.Count())
    End Sub

    Function Compute(ByVal d As Double) As Double
        'Make the processor work just a little bit.
        Return Math.Sqrt(d)
    End Function


    ' Create a contrived array of monotonically increasing
    ' values for demonstration purposes. 
    Function MakeDemoSource(ByVal size As Integer, ByVal valToFind As Double) As Double()
        Dim result As Double() = New Double(size - 1) {}
        Dim initialval As Double = 0.01
        For i As Integer = 0 To size - 1
            initialval *= valToFind
            result(i) = initialval
        Next
        Return result
    End Function
End Module
namespace StopOrBreak
{
    using System;
    using System.Collections.Concurrent;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;

    class Test
    {
        static void Main()
        {
            StopLoop();
            BreakAtThreshold();

            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }

        private static void StopLoop()
        {
            Console.WriteLine("Stop loop...");
            double[] source = MakeDemoSource(1000, 1);
            ConcurrentStack<double> results = new ConcurrentStack<double>();

            // i is the iteration variable. loopState is a 
            // compiler-generated ParallelLoopState
            Parallel.For(0, source.Length, (i, loopState) =>
            {
                // Take the first 100 values that are retrieved
                // from anywhere in the source.
                if (i < 100)
                {
                    // Accessing shared object on each iteration
                    // is not efficient. See remarks.
                    double d = Compute(source[i]);
                    results.Push(d);
                }
                else
                {
                    loopState.Stop();
                    return;
                }

            } // Close lambda expression.
            ); // Close Parallel.For

            Console.WriteLine("Results contains {0} elements", results.Count());
        }


        static void BreakAtThreshold()
        {
            double[] source = MakeDemoSource(10000, 1.0002);
            ConcurrentStack<double> results = new ConcurrentStack<double>();

            // Store all values below a specified threshold.
            Parallel.For(0, source.Length, (i, loopState) =>
            {
                double d = Compute(source[i]);
                results.Push(d);
                if (d > .2)
                {
                    // Might be called more than once!
                    loopState.Break();
                    Console.WriteLine("Break called at iteration {0}. d = {1} ", i, d);
                    Thread.Sleep(1000);
                }
            });

            Console.WriteLine("results contains {0} elements", results.Count());
        }

        static double Compute(double d)
        {
            //Make the processor work just a little bit.
            return Math.Sqrt(d);
        }


        // Create a contrived array of monotonically increasing
        // values for demonstration purposes. 
        static double[] MakeDemoSource(int size, double valToFind)
        {
            double[] result = new double[size];
            double initialval = .01;
            for (int i = 0; i < size; i++)
            {
                initialval *= valToFind;
                result[i] = initialval;
            }

            return result;
        }
    }

}

在 ParallelFor() 或 [Overload:System.Threading.Tasks.Parallel.Parallel.ForEach`1] 循环中,不能使用与顺序循环中相同的 breakExit 语句,这是因为这些语言构造对于循环是有效的,而并行“循环”实际上是方法,不是循环。 相反,可以使用 StopBreak 方法。 Parallel.For 的一些重载接受 Action<int, ParallelLoopState>(在 Visual Basic 中为 Action(Of Integer, ParallelLoopState))作为输入参数。 ParallelLoopState 对象由运行时在后台创建,您可以在 lambda 表达式中为它指定您喜欢的任何名称。

在下面的示例中,该方法只需要源序列中的 100 个值,检索出哪些元素并不重要。 在此案例中,使用 Stop 方法,因为它将告知循环的所有迭代(包括那些在其他线程上的当前迭代之前开始的迭代)在方便的情况下尽快停止。

在第二个方法中,将检索所有元素,直到源序列中指定的索引。 在此案例中,调用 Break,这是因为当到达一个线程上的索引时,源中前面的元素有可能尚未处理。 中断会导致其他线程放弃对后续片段的工作(如果它们正忙于任何这样的工作),并在退出循环之前处理完所有以前的元素。

在调用 StopBreak 后,循环中的其他线程可能会继续运行一段时间(这不受应用程序开发人员的控制),理解这一点很重要。 可以使用 ParallelLoopState.IsStopped 属性检查是否已在另一个线程上停止该循环。 在下面的示例中,如果 IsStopped 为 true,则不会再有数据写入到集合中。

编译代码

  • 将该代码示例复制并粘贴到 Visual Studio 2010 项目中。

请参见

参考

System.Action<T1, T2>

概念

数据并行(任务并行库)

.NET Framework 中的并行编程

在 PLINQ 和 TPL 中的 Lambda 表达式