CCR Ports and PortSets

Microsoft Robotics

Glossary Item Box

Microsoft Robotics Developer StudioSend feedback on this topic

CCR Ports and PortSets

The Concurrency and Coordination Runtime (CCR) Port is the most common primitive and is used as the point of interaction between any two components. The port is implemented as the combination of two interfaces: IPortReceive and IPort. The interfaces logically separates methods that add items to the port, and methods that retrieve items, or attach code that removes items and executes asynchronously. The interfaces are not strongly typed. They work with instances of type object, but the default implementation, the Port<> class, has one generic type argument.

Bb648755.hs-note(en-us,MSDN.10).gifThe <> notation next to a class indicates its a generic class (similar to templates in C++). For example Port<int> is a port implementation with one Common Language Runtime (CLR) type argument of System.Int32.

Posting Items

Adding an item to the port is an asynchronous operation. When there are no arbiters attached to the port, the item is added to a queue. If there are arbiters present, each one will be called to evaluate the item and decide if a task should be created and scheduled for execution. This evaluation is a fast, non-blocking operation so the Post method returns control to the caller as soon as possible. This is the source of the asynchronous behavior in the CCR:  If certain conditions are met. Programs post items quickly to ports, which can then cause tasks to get scheduled in some thread other than the current one.

Example 1

C#
// Create port that accepts instances of System.Int32
Port<int> portInt = new Port<int>();

// Add the number 10 to the port
portInt.Post(10);

// Display number of items to the console
Console.WriteLine(portInt.ItemCount);

In Example 1, a port typed is created to accept only integers. Then one item is added, the number 10, to the port. The port ItemCount property is checked, which should read 1.

Retrieving Items

There are two scenarios when retrieving items from a port:

  1. The port is used as a passive queue: no tasks are scheduled when items are posted. In this scenario items can be retrieved by calling the Test method. The method will never block. If no item is present it will return false and set the out parameter to null. This scenario is useful when there is some other mechanism to execute code due to some event, and the user simply wants to use the ports as efficient first in first out queues. This also provides the flexibility of later attaching a receiver to the port, that can asynchronously execute tasks for any item already present
  2. The port is used as an active queue: tasks are scheduled for execution when items are posted, due to one or more arbiter being registered with the port. This is the most common case of using CCR ports. It is the primary source of concurrency since any user code that gets scheduled due to an item being posted, can potentially execute in parallel for every item posted

Example 2

C#
// Create port that accepts instances of System.Int32
var portInt = new Port<int>();

// Add the number 10 to the port
portInt.Post(10);

// Display number of items to the console
Console.WriteLine(portInt.ItemCount);

// retrieve the item using Test
int item;
var hasItem = portInt.Test(out item);
if (hasItem)
{
    Console.WriteLine("Found item in port:" + item);
}

portInt.Post(11);
// alternative to using Test is just assignment of port to variable using
// implicit operator
var nextItem = portInt;

Console.WriteLine("Found item in port:" + nextItem);

Example 2, an extension of Example 1,  uses the Test method to retrieve the item posted. The item is removed in a thread-safe manner and ItemCount should read zero.

Example 3

C#
// Create port that accepts instances of System.Int32
var portInt = new Port<int>();

// Add the number 10 to the port
portInt.Post(10);

// Display number of items to the console
Console.WriteLine(portInt.ItemCount);

// create dispatcher and dispatcher queue for scheduling tasks
Dispatcher dispatcher = new Dispatcher();
DispatcherQueue taskQueue = new DispatcherQueue("sample queue", dispatcher);

// retrieve the item by attaching a one time receiver
Arbiter.Activate(
    taskQueue,
    portInt.Receive(delegate (int item) // anonymous method
    {
        // this code executes in parallel with the method that
        // activated it                        
        Console.WriteLine("Received item:" + item);
    }
));
// any code below runs in parallel with delegate

Example 3 follows the same steps as Example 1, but instead of using the Test method, the Arbiter.Activate method is used, to register a simple receiver arbiter to the port. The receiver is associated with a user delegate, in this case an anonymous method defined inline. The delegate executes in one of the dispatcher threads associated with the DispatcherQueue instance supplied. The Dispatcher and DispatcherQueue classes  are not the focus here, and are described in more detail in the Scheduling Tasks section. Note the delegate always runs in parallel with the main body of this example. In this example, after the delegate runs, the receiver is automatically removed from the port. More information on receivers and arbiters is presented in a later section.

Inspecting Port State

The following methods and properties on the Port<> class are useful for inspecting its state at runtime, either for debugging or for regular code logic

  • ToString - This method overrides the default ToString() implementation and will produce, in human readable form, the number of items in the port and the hierarchy of the receivers associated with the port.
  • ItemCount - Number of items currently queued. Note that anytime one or more persisted arbiters (they stay attached even after consuming one item) is registered with the port, this count will be zero. Most arbiters cause items to be immediately converted to Tasks, scheduled for execution, so the items never actually show up in the port item queue.

Port Sets

Since the Port<> class only takes one generic type argument, its often convenient to associate multiple, independent instances of the Port<> class, under a single container that itself can be treated as one entity. The PortSet<> generic class allows this grouping and is the most common way to define the interface to a CCR software component: Instead of having a set of public methods, CCR components expose only a strongly typed PortSet, with N independent Port<> instances underneath. The component can now coordinate how different message types execute in relation to each other. Further, the code always runs concurrently and independently from the code that posts the messages. This programming model is very familiar to authors of web services, traditional server processes, or kernel mode I/O processors.

Constructing a PortSet instance

There are two ways to create a new instance of PortSet:

  1. Use the generic type arguments to define at compile time, the number and type of Port<> instances the PortSet will support. The CCR provides implementations of the PortSet of up to twenty generic type arguments on the desktop CLR, and up to eight generic arguments on the .NET Compact Framework, due to a JIT limitation. Using this approach gives you the best type safety and best runtime performance when posting. However it limits the number of types the port set can support.
  2. Use the initialization constructor that takes a parameter list of type arguments. The user can supply an arbitrary number of types, and the Port<> instances will be created at runtime. Some methods, like the Post for each message type is not available (PostUnknownType or TryPostUnknownType must be used) and there is a minor performance hit since the item type has to be checked and compared at runtime, against a table of valid types.

Example 4

C#
// Create a PortSet using generic type arguments
var genericPortSet = new PortSet<int, string, double>();
genericPortSet.Post(10);
genericPortSet.Post("hello");
genericPortSet.Post(3.14159);

// Create a runtime PortSet, using the initialization 
// constructor to supply an array of types
PortSet runtimePortSet = new PortSet(
    typeof(int),
    typeof(string),
    typeof(double)
    );

runtimePortSet.PostUnknownType(10);
runtimePortSet.PostUnknownType("hello");
runtimePortSet.PostUnknownType(3.14159);

Example 4 shows how to create a PortSet with three different types, using the generic type arguments, and then using the type array at runtime. The strongly typed Post methods can be added to a class that derives from the non generic PortSet, providing the same compile time safety as the generic PortSet<>.

Example 5

C#
/// <summary>
/// PortSet that accepts items of int, string, double
/// </summary>
public class CcrConsolePort : PortSet<int, string, double>
{
}
/// <summary>
/// Simple example of a CCR component that uses a PortSet to abstract
/// its API for message passing
/// </summary>
public class CcrConsoleService
{
    CcrConsolePort _mainPort;
    DispatcherQueue _taskQueue;

    /// <summary>
    /// Creates an instance of the service class, returning only a PortSet
    /// instance for communication with the service
    /// </summary>
    /// <param name="taskQueue"></param>
    /// <returns></returns>
    public static CcrConsolePort Create(DispatcherQueue taskQueue)
    {
        var console = new CcrConsoleService(taskQueue);
        console.Initialize();
        return console._mainPort;
    }

    /// <summary>
    /// Initialization constructor
    /// </summary>
    /// <param name="taskQueue">DispatcherQueue instance used for scheduling</param>
    private CcrConsoleService(DispatcherQueue taskQueue)
    {
        // create PortSet instance used by external callers to post items
        _mainPort = new CcrConsolePort();
        // cache dispatcher queue used to schedule tasks
        _taskQueue = taskQueue;
    }

    private void Initialize()
    {
        // Activate three persisted receivers (single item arbiters)
        // that will run concurrently to each other,
        // one for each item/message type
        Arbiter.Activate(_taskQueue,
            Arbiter.Receive<int>(true, _mainPort, IntWriteLineHandler),
            Arbiter.Receive<string>(true, _mainPort, StringWriteLineHandler),
            Arbiter.Receive<double>(true, _mainPort, DoubleWriteLineHandler)
        );
    }

    void IntWriteLineHandler(int item)
    {
        Console.WriteLine("Received integer:" + item);
    }
    void StringWriteLineHandler(string item)
    {
        Console.WriteLine("Received string:" + item);
    }
    void DoubleWriteLineHandler(double item)
    {
        Console.WriteLine("Received double:" + item);
    }
}

Example 5 defines a simple class, CcrConsolePort, that derives from a PortSet with three type arguments. This makes any subsequent use of the port set more readable since the generic type definitions don't have to be repeated. The class CcrConsoleService, implements a common pattern for CCR components. It has a static routine that creates the instance object and returns the private PortSet instance for communicating with it. It then activates a handler for each message type in the PortSet. Each handler activation is concurrent, assuming multiple threads are available for the dispatcher associated with the dispatcher queue.

Type Safety

The PortSet class allows for enumeration of all the port instances and item types it supports. The generic PortSet also provides convenient methods that automatically invoke the correct Port<> instance plus implicit conversion operations that can cast a PortSet<> instance to a Port<> instance, automatically. If the type argument is valid for that PortSet. This is useful when registering receivers with one of the ports in the PortSet, using just the type argument.

Example 6

C#
var portSet = new PortSet<int, string, double>();

// the following statement compiles because of the implicit assignment operators
// that "extract" the instance of Port<int> from the PortSet
var portInt = portSet;

// the implicit assignment operator is used below to "extract" the Port<int>
// instance so the int receiver can be registered
Arbiter.Activate(_taskQueue,
    Arbiter.Receive<int>(true, portSet, item => Console.WriteLine(item))
);

Example 6 demonstrates the use of the implicit operator in two use cases:

  1. Assigning the correct instance of a Port<> within a PortSet, to another Port<> variable
  2. Extracting the correct instance of a Port<> so it can be used to register an arbiter

 

 

© 2012 Microsoft Corporation. All Rights Reserved.

Show: