How to: Unlink Dataflow Blocks

.NET Framework (current version)

This document describes how to unlink a target dataflow block from its source.


The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.


The following example creates three TransformBlock(Of TInput, TOutput) objects, each of which calls the TrySolution method to compute a value. This example requires only the result from the first call to TrySolution to finish.

Imports System
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
   ' Receives the value from the first provided source that has 
   ' a message.
   Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
      ' Create a WriteOnceBlock<T> object and link it to each source block.
      Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
      For Each source In sources
            ' Setting MaxMessages to one instructs
            ' the source block to unlink from the WriteOnceBlock<T> object
            ' after offering the WriteOnceBlock<T> object one message.
            source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
      Next source
      ' Return the first value that is offered to the WriteOnceBlock object.
      Return writeOnceBlock.Receive()
   End Function

   ' Demonstrates a function that takes several seconds to produce a result.
   Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
      ' Simulate a lengthy operation that completes within three seconds
      ' or when the provided CancellationToken object is cancelled.
      SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))

      ' Return a value.
      Return n + 42
   End Function

   Shared Sub Main(ByVal args() As String)
      ' Create a shared CancellationTokenSource object to enable the 
      ' TrySolution method to be cancelled.
      Dim cts = New CancellationTokenSource()

      ' Create three TransformBlock<int, int> objects. 
      ' Each TransformBlock<int, int> object calls the TrySolution method.
      Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
      Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
      Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
      Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)

      ' Post data to each TransformBlock<int, int> object.

      ' Call the ReceiveFromAny<T> method to receive the result from the 
      ' first TransformBlock<int, int> object to finish.
      Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)

      ' Cancel all calls to TrySolution that are still active.

      ' Print the result to the console.
      Console.WriteLine("The solution is {0}.", result)

   End Sub
End Class

' Sample output:
'The solution is 53.

To receive the value from the first TransformBlock(Of TInput, TOutput) object that finishes, this example defines the ReceiveFromAny(T) method. The ReceiveFromAny(T) method accepts an array of ISourceBlock(Of TOutput) objects and links each of these objects to a WriteOnceBlock(Of T) object. When you use the LinkTo method to link a source dataflow block to a target block, the source propagates messages to the target as data becomes available. Because the WriteOnceBlock(Of T) class accepts only the first message that it is offered, the ReceiveFromAny(T) method produces its result by calling the Receive(Of TOutput) method. This produces the first message that is offered to the WriteOnceBlock(Of T) object. The LinkTo method has an overloaded version that takes a Boolean parameter, unlinkAfterOne that, when it is set to True, instructs the source block to unlink from the target after the target receives one message from the source. It is important for the WriteOnceBlock(Of T) object to unlink from its sources because the relationship between the array of sources and the WriteOnceBlock(Of T) object is no longer required after the WriteOnceBlock(Of T) object receives a message.

To enable the remaining calls to TrySolution to end after one of them computes a value, the TrySolution method takes a CancellationToken object that is canceled after the call to ReceiveFromAny(T) returns. The SpinUntil method returns when this CancellationToken object is canceled.

Compiling the Code

Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named DataflowReceiveAny.cs (DataflowReceiveAny.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReceiveAny.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReceiveAny.vb

Robust Programming