Was this page helpful?
Your feedback about this content is important. Let us know what you think.
Additional feedback?
1500 characters remaining
Export (0) Print
Expand All

CCR Task Scheduling

Microsoft Robotics

Glossary Item Box

Microsoft Robotics Developer StudioSend feedback on this topic

CCR Task Scheduling

The third major component of the Concurrency and Coordination Runtime (CCR) is how tasks, generated when messages arrive on ports with active receivers, get load balanced among the execution resources of the machine. There are three important classes that implement or abstract scheduling in the CCR:

  • The task class, including ITask interface, the Task and IterativeTask implementations. Only classes that implement ITask can be scheduled. Arbiters also implement ITask so they can be scheduled and properly activate.
  • The DispatcherQueue class. DispatcherQueue is a first in first out (FIFO) queue of Tasks. Dispatcher queues can use the Common Language Runtime (CLR) thread pool for scheduling tasks (very uncommon) or an instance of a CCR dispatcher
  • The Dispatcher class. The dispatcher manages OS threads and load balances tasks de-queued from one or more DispatcherQueue instances.

Example 16

C#
var dispatcher = new Dispatcher(
    0, // zero means use one thread per CPU, or 2 if only one CPU present
    "sample dispatcher" // friendly name assgined to OS threads
    );

var taskQueue = new DispatcherQueue(
    "sample queue", // friendly name
    dispatcher // dispatcher instance
    );

var port = new Port<int>();
Arbiter.Activate(taskQueue,
   Arbiter.Receive(
       true,
       port,
       item => Console.WriteLine(item) 
   )
);

// post item, so delegate executes
port.Post(5);

Example 16 shows how a dispatcher and dispatcher queue are created and then used to schedule tasks. The following is a step-by-step description of the example:

  1. An instance of a Dispatcher is created, using 0 as the number of threads. This makes the CCR choose a number of threads based on the number of CPU cores reported by the OS. The number of threads per CPU, used for the default, is controlled by the static property ThreadsPerCpu on the Dispatcher class.
  2. An instance of a DispatcherQueue is created, supplying the Dispatcher instance we created in step 1. This attaches the DispatcherQueue.
  3. An instance of Port<int> is created and used to post items and also attach a receiver with delegate.
  4. The Arbiter.Activate method is called passing the instance of the DispatcherQueue created earlier, plus a new receiver arbiter with the port it needs to listen on, plus the delegate to execute when an item is posted on the port
  5. An item of type int is posted on the port.

When an item is posted on a port with a receiver attached, the following happens within the port implementation:

  1. A container is created for the value being posted. The container class, IPortElement, allows the CCR to queue items and also assign them to task instances, without caring about the item type.
  2. The container instance is queued.
  3. If the list of receivers is not null and there is at least one receiver, the port will call the ReceiverTask.Evaluate method so the receiver and its arbiter hierarchy can determine if the item can be consumed. In this example, the receiver will return true from evaluate and also create a new instance of Task<int> using the item and the user delegate as parameters.
  4. The port logic calls taskQueue.Enqueue with the task returned from the evaluate method on the receiver. Note that when a receiver is first activated, it is associated with the dispatcher queue instance supplied in the Arbiter.Activate method

After step 4 above, the generated task instance is now dealt by the scheduling logic.

Example 17

C#
// directly enqueue a task with an inlined method plus a parameter
taskQueue.Enqueue(
    new Task<int>(5, item => Console.WriteLine(item))
    );

Example 17 shows the equivalent code for scheduling the same delegate as in example 16 but without posting anything on a port. Creating a task instance explicitly is useful when data is available and code can be immediately executed to process it. The CCR does a similar task creation when a receiver is invoked in the context of a Port.Post call.

Once an item is queued in the dispatcher queue, the following happens:

  1. The dispatcher queue signals the dispatcher instance it's associated with, that a new task is available for execution.
  2. The dispatcher notifies one or more instances of its TaskExecutionWorker class. Each task execution worker manages one OS thread. When items are available for scheduling it puts the thread in an efficient sleep state, waiting for a signal from the dispatcher.
  3. An instance of TaskExecutionWorker calls the DispatcherQueue test method to retrieve a task from the queue. If a task is available, and not already picked up by another worker, the worker calls ITask.Execute.
  4. The Task.Execute invokes the delegate associated with the task, passing it one or more parameters associated with the task. In the example a single parameter with the value 5 is passed to the delegate that writes to the console.

Throttling

The CCR DispatcherQueue implementation allows for throttling of task execution, based on a few predefined policies. Task throttling is a key feature of the CCR. It enables programs to gracefully handle large message loads and push the complexity of managing large queues to the CCR scheduler. The policy for throttling is specified when a dispatcher queue is created. Since a different dispatcher queue can be used per activation of a coordination primitive, you can apply different policies, for different handlers.

Task execution policy enumeration.
C#
    /// <summary>
    /// Specifies dispatcher queue task scheduling behavior
    /// </summary>
    public enum TaskExecutionPolicy
    {
        /// <summary>
        /// Default behavior, all tasks are queued with no constraints
        /// </summary>
        Unconstrained = 0,
        
        /// <summary>
        /// Queue enforces maximum depth (specified at queue creation) 
        /// and discards tasks enqueued after the limit is reached
        /// </summary>
        ConstrainQueueDepthDiscardTasks,
        
        /// <summary>
        /// Queue enforces maximum depth (specified at queue creation)
        /// but does not discard anny tasks. It forces the thread posting any tasks after the limit is reached, to
        /// sleep until the queue depth falls below the limit
        /// </summary>
        ConstrainQueueDepthThrottleExecution,
        
        /// <summary>
        /// Queue enforces the rate of task scheduling specified at queue creation
        /// and discards tasks enqueued after the current scheduling rate is above the specified rate
        /// </summary>
        ConstrainSchedulingRateDiscardTasks,
        
        /// <summary>
        /// Queue enforces the rate of task scheduling specified at queue creation
        /// and forces the thread posting tasks to sleep until the current rate of task scheduling falls below
        /// the specified average rate
        /// </summary>
        ConstrainSchedulingRateThrottleExecution
    }

Example 26

C#
void ThrottlingExample()
{
    int maximumDepth = 10;
    Dispatcher dispatcher = new Dispatcher(0, "throttling example");
    DispatcherQueue depthThrottledQueue = new DispatcherQueue("ConstrainQueueDepthDiscard",
          dispatcher,
          TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,
          maximumDepth);

    Port<int> intPort = new Port<int>();
    Arbiter.Activate(depthThrottledQueue,
       Arbiter.Receive(true, intPort,
       delegate(int i)
       {
           // only some items will be received since throttling will discard most of them
           Console.WriteLine(i);
       })
    );

    // post items as fast as possible so that the depth policy is activated and discards
    // all the oldest items in the dispatcher queue
    for (int i = 0; i < maximumDepth * 100000; i++)
    {
        intPort.Post(i);
    }
}

Example 26 shows how a Dispatcher instance and a DispatcherQueue instance is created, but a task execution policy of ConstrainQueueDepthDiscardTasks, is specified as one of the possible options. A persisted receiver is attached, and posts a million items as fast as possible. If a policy was not specified, one million tasks would be scheduled across all CPU cores on the machine. The the dispatcher queue depth would then grow large, based on how fast each task got scheduled. With a policy specified, the CCR will discard the oldest tasks, and only keep the last 10 tasks for executions. This is very useful in situations where only the most recent N messages are useful, such as notifications, timers, etc.

Bb648756.hs-note(en-us,MSDN.10).gifNote that the depthThrottledQueue instance was supplied to the Arbiter.Activate method, associating this queue with the specific policy, with the single item receiver that will generate the task instances for each item posted.

Policy scenarios

  • ConstrainSchedulingRateDiscardTasks: Appropriate when a CCR handler wants to process messages that arrive at some regular rate, in terms of messages per second. Keeping all messages is not important, but it’s important to retain the most recent messages. This policy guarantees the code will execute at a fixed rate, even when messages arrive in bursts. Appropriate for sensor notifications, timer events
  • ConstrainQueueDepthDiscardTasks: Appropriate when messages can be discarded but the last N messages should be preserved. If the CPU falls behind processing tasks generated from messages, this policy guarantees that the oldest tasks get thrown away and the most recent N tasks get scheduled. The most useful number for the depth threshold is 1, since that keeps the most recent last message/task. This is useful for processing critical messages, sensor data, expiration timers, where there is no well known average rate, as when the messages are not periodic.
  • ConstrainSchedulingRateThrottleExecution: Appropriate only when the source of the periodic messages is another thread within the same OS process. Throttling would introduce a Thread.Sleep on the caller of the post method on the port associated with a receiver and the dispatcher queue. It will slow down the originator of the messages. No tasks are ever discarded.
  • ConstrainQueueDepthThrottleExecution: Same behavior as above, except its appropriate when the messages have no periodicity, instead they arrive at random intervals, in bursts. This is common for ports receiving messages from multiple other machines, across the network. Again no tasks are dropped, but the thread executing the network inbound operation, will be throttled, reducing the rate messages are delivered on the CCR port.

 

 

© 2012 Microsoft Corporation. All Rights Reserved.

Show:
© 2015 Microsoft