7: Pipelines

patterns & practices Developer Center

On this page:
Types of Messaging Blocks | The Basics | An Example | Sequential Image Processing | The Image Pipeline | Performance Characteristics | Variations | Asynchronous Pipelines | Canceling a Pipeline | Handling Pipeline Exceptions | Load Balancing Using Multiple Producers | Pipelines and Streams | Anti-Patterns | Copying Large Amounts of Data between Pipeline Stages | Pipeline Stages that Are Too Small | Forgetting to Use Message Passing for Isolation | Infinite Waits | Unbounded Queue Growth | More Information | Design Notes | Related Patterns | Exercises | Further Reading

The Pipeline pattern allows you to achieve parallelism in cases where there are data dependencies that would prevent you from using a parallel loop. A pipeline is composed of a linear series of producer/consumer stages, where each stage depends on the output of its predecessor. The pipeline is an example of a more general category known as a dataflow network. A dataflow network decomposes computation into cooperating components that communicate by sending and receiving messages.

There are a variety of techniques for implementing pipelines. Those described in this chapter use in-process messaging blocks and asynchronous agents, both of which are provided by the Asynchronous Agents Library.

A dataflow network is a set of asynchronous components that use messages to communicate with one other.

A data pipeline is a sequence of asynchronous components that are connected by message buffers. Each stage of the pipeline receives input from its predecessor. Use a pipeline when data dependencies prevent you from a using a parallel loop.

A pipeline’s data flows from its first stage, through intermediate stages and then to a final stage. The stages of a pipeline execute concurrently, but in contrast to a parallel loop, the overall effect is to process the input data in a fixed order. You can think of software pipelines as analogous to assembly lines in a factory, where each item in the assembly line is constructed in stages. The partially assembled item is passed from one assembly stage to another. The outputs of the assembly line occur in the same order as that of the inputs, but more than one item is assembled at a time.

Note

In this chapter, the Asynchronous Agents Library's function is to improve performance when there are multiple cores available, but it has other uses. More generally, agents with asynchronous communication can be used to implement concurrency and as a way to organize applications such as simulations.

Pipelines occur in many applications. You can use a pipeline when data elements are received from a real-time event stream, such as values on stock ticker tapes, user-generated mouse click events, or packets that arrive over the network. You can also use pipelines to process elements from a data stream, as is done with compression and encryption, or to apply transformation operations to streams of video frames. In all of these cases, it's important that the data elements are processed in sequential order. You can’t use a parallel loop for these cases because a parallel loop doesn't preserve the processing order.

Note

Don't confuse pipelines and parallel loops. Pipelines are used when parallel loops can't be. With the Pipeline pattern, the data is processed in sequential order. The first input is transformed into the first output, the second input into the second output, and so on.

Types of Messaging Blocks

The Asynchronous Agents Library includes the following types of messaging blocks, which are useful in a variety of situations:

  • unbounded_buffer: a concurrent queue of unbounded size.
  • overwrite_buffer: a single value that can be updated many times.
  • single_assignment: a single value that can be set just once.
  • call: a function that is invoked whenever a value is added to the messaging block.
  • transformer: a function that is invoked whenever a value is added to the messaging block; the function's return value is added to an output messaging block.
  • choice: selects a message from a set of sources.
  • join: waits for more than one source before proceeding.
  • multitype_join: same as join, except that inputs may have multiple message types.
  • timer: produces messages based on time intervals.

In this chapter you’ll see examples of four types of messaging blocks. They are the unbounded_buffer<T>, overwrite_buffer<T>, transformer<T, S> and call<T> messaging blocks. It's also possible to implement your own messaging blocks.

The Basics

In the Asynchronous Agents Library, the buffers that connect stages of a software pipeline are usually messaging blocks, such as instances of the unbounded_buffer<T> class. Although the buffer itself is unbounded, the pipeline includes a feedback mechanism that limits the number of pending items. The stages of the pipeline can themselves be implemented with instances of the agent class.

The Asynchronous Agents Library provides messaging blocks, agents, and functions that send and receive messages.

Note

This section describes an agent-based approach to pipelines that requires a dedicated thread for each pipeline stage. Each stage uses an instance of the agent class. See "Asynchronous Pipelines" in this chapter for an important variation of this pattern that does not dedicate a thread to each pipeline stage.

Figure 1 illustrates an example of a pipeline that has four stages. It reads words and sentence fragments from a data source, it corrects the punctuation and capitalization, it combines the words and phrases into complete sentences, and then it writes the sentences to a disk file.

Gg663538.E9745A4D0B09F15D141E87D45BCA0AE7(en-us,PandP.10).png

Figure 1

Sample pipeline

Stages of the pipeline read from a dedicated input, or source, and write to a particular output, or target. For example, the "Correct Case" stage uses buffer 1 as its source and writes to buffer 2 as its target. All the stages of the pipeline can execute at the same time because the three messaging blocks buffer any shared inputs and outputs. If there are four available cores, the four stages can run in parallel.

Stages in the pipeline block (that is, wait) on inputs. An input wait is familiar from other programming contexts—if an enumeration or a stream doesn't have a value, the consumer of that enumeration or stream waits until a value is available or an end–of-file condition occurs. Using buffers that hold more than one value at a time compensates for variability in the time it takes to process each value. Buffers allow stages of the pipeline to be asynchronous.

Note

When using the unbounded_buffer<T> class you should define a special value as your end-of-file token. This special value is sometimes called the sentinelvalue. When using sentinel values you must be careful that the end-of-file signal can never occur as one of the regular messages. This example uses the value given by the PhraseSource::FinishedSentinel () static method to signal the end of the sequence of values.
Choosing a sentinel value can be harder than it seems at first. It's often the case that all values of the type T have meaning as valid payloads of an unbounded_buffer<T> instance. For example, if your payload type is a string, you might be tempted to use the empty string as the sentinel value, but this would only be safe if you can guarantee that the empty string is never used as a normal value to be processed by the pipeline. In practice, the null pointer is often used as the sentinel value.

The following code demonstrates how to implement a pipeline that uses the unbounded_buffer<T> class for the buffers and the agent class for the stages of the pipeline.

unbounded_buffer<wstring> buffer1;
unbounded_buffer<wstring> buffer2;
unbounded_buffer<wstring> buffer3;
PipelineGovernor governor(g_sentencePipelineLimit);

ReadStringsAgent agent1(seed, g_sentenceMax, governor, buffer1);
CorrectCaseAgent agent2(buffer1, buffer2);
CreateSentencesAgent agent3(buffer2, buffer3);
WriteSentencesAgent agent4(g_targetSentence, g_pipelineResults,
                           governor, buffer3);

agent1.start(); 
agent2.start(); 
agent3.start(); 
agent4.start();

agent* agents[4] = { &agent1, &agent2, &agent3, &agent4 };
agent::wait_for_all(4, agents);

The first stage generates the input strings and places them in buffer1. The second stage transforms the strings. The third stage combines the strings into sentences. The final stage writes the corrected sentences to a file. Note that in this example, the number of input elements (words) is not the same as the number of output elements (sentences); part of the pipeline's functionality is to combine words into sentences.

Use an instance of the unbounded_buffer<T> class to connect stages of a pipeline.

References to input and output buffers are passed to each agent’s constructor. For example, the second stage, agent2, which is an instance of the CorrectCaseAgent class, uses buffer1 as its input and buffer2 as its output. Figure 1 illustrates the resulting connections among the agents.

Stages of the pipeline may not take exactly the same amount of time to process each element. To prevent an excess number of buffered elements, the pipeline uses a mechanism for limiting the number of data elements that may be pending at any given time. This mechanism is provided by the PipelineGovernor class, which is defined in the Utilities folder of the Microsoft® Visual Studio® solution for the example code. Only the first and the last stages of the pipeline need to interact with the pipeline governor. When an item exits the pipeline, the last stage of the pipeline asks the governor instance to decrement a counter of in-flight elements. Before placing a new element into the pipeline, the first stage of the pipeline checks to see that the maximum number of in-flight elements hasn't been exceeded. If this is the case, the pipeline stage asks the governor instance to increment the count of in-flight elements and then places the element into the first buffer. If the pipeline is full, the first stage waits for the governor to signal when space in the pipeline becomes available. Internally, the governor uses a messaging block to synchronize between the last stage of the pipeline and the first.

Use a governor to limit the number of in-flight elements in the pipeline.

Note

If you don’t use a governor to limit the in-flight elements in a pipeline, their numbers can grow without bound. Adding too many elements at once can result in performance problems or out-of-memory errors. The governor provided in this example is just one approach; you may want to consider others, depending on the needs of your application.

After all of the agents have been created and connected to their respective sources and targets, the code invokes the start method of each agent.

Call an agent’s start method to begin execution.

The code calls the wait_for_all static method of the agent class to defer cleanup until after all stages have completed processing their inputs. In this code example, the memory for the agents is allocated on the stack in the current context, so you can’t exit the current context until the agents have finished their work.

Use the wait_for_all method to allow agents to finish processing before proceeding in the current context.

The first stage of the pipeline is implemented by the ReadStringsAgent class. This agent includes a sequential loop that writes to its output buffer. Here is the code for the agent’s run method, which specifies what the agent should do after it has started.

An agent’s run method is invoked when the agent is started. The agent terminates when the run method invokes the agent’s done method and exits.

class ReadStringsAgent : public agent
{
  // ...
  void run()
  {
    PhraseSource source(m_seed, m_numberOfSentences);
    wstring inputPhrase;
    do
    {
      // ...
      inputPhrase = source.Next();
  
      // Limit whole sentences in the pipeline not phrases.
      if (phrase == L".") 
        m_governor.WaitForAvailablePipelineSlot();
      asend(m_phraseOutput, inputPhrase);
    } while (inputPhrase != PhraseSource::FinishedSentinel());
    done();
  }
};

The sequential do loop populates the output buffer with values. The loop is sequential in order to preserve the order of elements that are processed, which is one of the requirements of applications that use the Pipeline pattern. The values come from an external data source that's represented by the PhraseSource class. Successive values are retrieved by calling the phrase source object’s Next method.

Note

Applications that use the Pipeline pattern require that elements be processed in order. If the processing order is not important, you may consider using another pattern, such as the Parallel Loop pattern, to process your data.

The asend function, named for "asynchronous send," is provided by the Asynchronous Agents Library in the agents.h header file. It schedules a task to propagate the data to the target messaging block.

A producer can use either the send or asend function to relay values, which are also referred to as messages, to the target messaging block. The send function blocks the current context until the target messaging block accepts or rejects the message. The send function returns true if the message was accepted and false otherwise. The asend function does not wait for the target to accept or decline the message before it returns. Use send when you must ensure that the value reaches its destination before proceeding in the current context. Use asend when you want a "fire and forget" style of message passing.

Use the send or asend function to place data into the next stage’s input buffer.

It's possible in some cases that a message won't be accepted by the target messaging block. This can occur, for example, if you attempt to send more than one message to a single assignment buffer. The second message won’t be accepted by the buffer. Additionally, messaging block constructors allow you to provide a custom filter function that determines whether an incoming message will be accepted by that messaging block.

The first stage of the pipeline invokes the WaitForAvailablePipelineSlot method of the governor before adding a new element. If the pipeline is full, the governor will block until space becomes available.

The second stage of the pipeline capitalizes words if necessary. It consumes values from its source buffer, transforms them, and places the transformed values into its target buffer. The following code shows the run method of the CorrectCaseAgent class.

class CorrectCaseAgent : public agent
{
  // ...

  void run()
  {
    wstring inputPhrase;
    while(true) 
    {
      inputPhrase = receive(m_phraseInput);
      if (inputPhrase == PhraseSource::FinishedSentinel())
      {
        asend(m_phraseOutput, inputPhrase);
        break;
      }     
      // ... transform phrase by possibly capitalizing it
      asend(m_phraseOutput, outputPhrase);
    }
    done();
  }
};

The important point of this code is that the receive function is called on the messaging block that acts as the source. This allows the consuming agent to wait for values to become available from the producer of those values. The code iterates until the special end-of-file sentinel value is seen.

Use the receive function to wait for input from a messaging block. Use a sentinel value to indicate shutdown.

Some messaging blocks allow multiple consumers and producers.

Some messaging blocks, including the unbounded_buffer<T> class, support multiple producers and consumers.

The third stage of the pipeline uses the run method of the CreateSentencesAgent class to read a sequence of phrases and combine them into sentences. When it encounters a phrase that ends with the period character, it knows that the end of the sentence has been reached and writes the sentence to the target messaging buffer. The CreateSentencesAgent class shows that it's not always necessary for pipeline stages to consume and produce an identical number of values.

Pipeline stages can summarize or combine values. There’s not always a one-to-one correspondence of inputs and transformed outputs in each stage of a pipeline.

The last stage of the pipeline, which is implemented by the WriteSentencesAgent class, consumes values from its predecessor in the pipeline but doesn't produce any values. Instead, it writes to an output file stream. Here's the code for the agent’s run method.

class WriteSentencesAgent : public agent
{ 
  // ...

  void run()
  {
    wofstream fout;
    fout.open(m_outputPath);
    wstring sentence;
    while(true)
    {
      sentence = receive(m_sentenceInput);
      if (sentence == PhraseSource::FinishedSentinel())
        break;
      if (sentence == m_targetSentence)
        sentence.append(L"       Success!");
      fout << m_currentSentenceCount++ << L" " 
                                     << sentence.c_str() << endl;
      sentence.clear();
      m_governor.FreePipelineSlot();

      OutputProgress(m_currentSentenceCount);
    }
    fout.close();
    done();
  }
};

The agent reads sentences it receives and compares them to the desired target sentence, m_targetSentence. It writes all generated sentences to a file and flags the ones that match the target.

The agent invokes the FreePipelineSlot method of the pipeline’s governor to signal that space in the pipeline has become available.

One reason that agents and messaging blocks make it easy to write pipelines is that you can rely on familiar sequential techniques such as iteration. There is some synchronization, but it's hidden inside the implementation of the unbounded_buffer<T> class.

(Some details of error handling, cancellation, and the collection of performance data have been omitted from this example for clarity. To see error handling and cancellation code, review the full ImagePipeline sample that's mentioned later in this chapter.)

An Example

The online samples include an application named ImagePipeline. This application takes a directory of JPEG images and generates thumbnail versions, which are also post-processed with an image-enhancing filter. The resulting processed images are displayed as a slideshow, in alphabetical file name order.

Note

You can't use a parallel loop for this example because the application requires that images be processed in sequence. Parallel loops don't guarantee any particular processing order.

Sequential Image Processing

Each image is processed in four stages: the large color image is loaded from a file, a small thumbnail with a picture frame is generated from it, noise is added to the image to create a speckling effect, and then the processed image is displayed as the next picture in the slideshow. Figure 2 illustrates this sequence.

Gg663538.FCFDB18952FE4D749566042C79F26245(en-us,PandP.10).png

Figure 2

Sequential image processing

Here's the code for the sequential version.

vector<wstring> filenames = ...

int sequence = kFirstImage;

for_each_infinite(filenames.cbegin(), filenames.cend(), 
  [this, &sequence, offset] (wstring file)->bool
{
  ImageInfoPtr pInfo = LoadImage(sequence++, file, offset);
  ScaleImage(pInfo, m_imageDisplaySize);
  FilterImage(pInfo, m_noiseLevel);
  DisplayImage(pInfo);
  return IsCancellationPending();
});

The four steps are performed by the LoadImage, ScaleImage, FilterImage, and DisplayImage methods. This example is slightly abridged for clarity. The code that deals with the capture of performance measurements is omitted. You can refer to the online samples to see those details.

The type ImageInfoPtr is a typedef abbreviation for shared_ptr<ImageInfo>, a Standard Template Library (STL) shared pointer to an ImageInfo instance. The ImageInfo class is a data record that contains the image bitmap to be processed. Pointers are used as a way to pass data between stages of the pipeline without the overhead of copying the image bitmaps. Buffering ensures that no locks are needed for this "shared" data; each ImageInfo instance is guaranteed to be accessed by only one stage of the pipeline at a time.

The function for_each_infinite is a helper function that is defined by the sample code. It invokes a function (in this case a function object given by a lambda expression) on each element of a sequence. When the loop reaches the end of the sequence, it restarts at the beginning; however, if any invocation of the function returns true, iteration stops. In this example, the only way to exit the loop is by throwing an exception or when IsCancellationPending() returns true. See "Variations" in this chapter for more information on the cancellation model that is used in this example.

The Image Pipeline

The sequential loop can process only one image at a time; each image must complete all four stages before work can begin on the next image, and the stages themselves are sequentially linked. In fact, this example seems intractably sequential—the top-level loop has the restriction that images must be displayed in order (like video frames), and within each step are substeps that require inputs from previous substeps. You can't display an image until after the filter is applied to it. You can't apply the filter until after the image is scaled to thumbnail size. You can't do the scaling until after the original image loads.

Even with such strong sequential constraints, the Pipeline pattern can introduce parallelism into this example. Each image will still pass through all four stages, in sequence, but the stages themselves can work on different images at the same time. Figure 3 illustrates the image pipeline.

Gg663538.0137CB2DD6EA4C99AF72A41E308DA998(en-us,PandP.10).png

Figure 3

Image pipeline

The following code from the ImageAgentPipelineControlFlow**.**h file shows the parallel version.

unbounded_buffer<ImageInfoPtr> buffer1;
unbounded_buffer<ImageInfoPtr> buffer2;
unbounded_buffer<ImageInfoPtr> buffer3;

ImageScalerAgent   imageScaler(..., buffer1, buffer2);
ImageFiltererAgent imageFilterer(..., buffer2, buffer3);
ImageDisplayAgent  imageDisplayer(..., m_governor, 
                                  ..., buffer3);

imageScaler.start();
imageFilterer.start();
imageDisplayer.start();

vector<wstring> filenames = ...

int sequence = kFirstImage;

for_each_infinite(filenames.cbegin(), filenames.cend(), 
  [this, offset, &buffer1, &sequence] (wstring file)->bool
{
  ImageInfoPtr pInfo = this->LoadImage(sequence++, file, offset);
  if (nullptr == pInfo) 
    return true;
  m_governor.WaitForAvailablePipelineSlot();
  asend(buffer1, pInfo);

  return IsCancellationPending();
});

m_governor.WaitForEmptyPipeline(); 
asend<ImageInfoPtr>(buffer1, nullptr);
agent* agents[3]={&imageScaler, &imageFilterer, &imageDisplayer};
agent::wait_for_all(3, agents);

There are three unbounded_buffer<T> messaging blocks that act as buffers between the stages of the pipeline. A call to the agent’s start method launches each processing stage.

The code iterates through the file names to be processed and uses the LoadImage method to load each image into memory. This step is the same as in the sequential version of the code. However, instead of proceeding directly to the next operation, the code places a shared pointer to the newly loaded image’s data into messaging block buffer1, which is the input source of the ImageScalerAgent object. The image scaling agent receives the image and begins to process it. Meanwhile, the loop continues with its next iteration and begins loading the next image.

Like the text processing example described in the previous section of this chapter, the image processing example uses the PipelineGovernor utility class to limit the maximum number of in-flight elements in the pipeline.

Note

Be careful about copying large amounts of data between pipeline stages. For example, copying large bitmapped images between stages will unnecessarily consume a large amount of memory. Instead, pass a pointer to a data structure.

(Some details of error handling, cancellation, and the collection of performance data have been omitted here for clarity. Refer to the online sample for the complete implementation.)

Performance Characteristics

To understand the performance characteristics of the sequential and pipelined versions, it's useful to look at a scheduling diagram such as Figure 4.

Follow link to expand image

Figure 4

Image pipeline with stages of equal speed

Figure 4 shows how the tasks in the image pipeline example execute over time. For example, the top row shows that stage 1 processes image 1 starting at time t0 and image 2 starting at time t1. Stage 2 begins processing image 1 at time t1. Assume for a moment that the pipeline is perfectly balanced; that is, each stage of the pipeline takes exactly the same amount of time to do its work. Call that duration T. Therefore, in Figure 4, t1 occurs after T units of time have elapsed, t2 after 2 x T units of time have elapsed, and so on.

If there are enough available cores to allow the pipeline's tasks to run in parallel, Figure 4 shows that the expected execution time for six images in a pipeline with four stages is approximately 9 x T. In contrast, the sequential version takes approximately 24 x T because each of the 24 steps must be processed one after another.

The average performance improves as more images are processed. The reason for this, as Figure 4 illustrates, is that some cores are idle as the pipeline fills during startup and drains during shutdown. With a large number of images, the startup and shutdown times become relatively insignificant. The average time per image would approach T.

If there are enough available cores, and if all stages of a pipeline take an equal amount of time, the execution time for the pipeline as a whole is the same as the time for just one stage.

However, there's one catch: the assumption that all the pipeline steps take exactly the same amount of time isn't always true. Figure 5 shows the scheduling pattern that emerges when the filter stage takes twice as long as the other stages.

Gg663538.7BB265F2734024DA67778253F6C79DED(en-us,PandP.10).png

Figure 5

Image pipeline with unequal stages

When one of the stages takes 2 x T units of time while the other stages take T units of time, you can see that it's not possible to keep all of the cores completely busy. On average (with a large number of images), the time to process an image is 2 x T. In other words, when there are enough cores for each pipeline stage, the speed of a pipeline is approximately equal to the speed of its slowest stage.

When the stages of a pipeline don't take the same amount of time, the speed of a pipeline is approximately equal to the speed of its slowest stage.

If you run the ImagePipeline application, you can see this effect for yourself. The ImagePipeline sample has a user interface (UI) feature that reports the average length of time in milliseconds for each of the stages of the pipeline. It also reports the overall average length of time that's needed to process each image. When you run the sample in sequential mode (by selecting the Sequential radio button), you'll notice that the steady-state elapsed time per image equals the sum of all the stages. When you run in pipeline mode, the average elapsed time per image converges to approximately the same amount of time as slowest stage. The most efficient pipelines have stages of equal speed. You won't always achieve this, but it's a worthy goal.

Variations

There are several variations to the pipeline pattern.

Asynchronous Pipelines

The pipelines that have been described so far are synchronous. Producers and consumers are long-running tasks (implemented with the agent class) that internally use sequential loops to read inputs and write outputs. Agents whose run methods contain sequential loops are sometimes called control-flow agents. They require a dedicated thread for each stage of the pipeline. Dedicating a thread to each stage makes sense if the pipeline follows the recommended practice of dividing the work into stages of equal duration. With an equal division of work, each of the threads will be continuously busy for the duration of the pipeline’s run. See the "Performance Characteristics" section of this chapter for more information about the ideal allocation of work to pipeline stages.

You can also have an asynchronous pipeline, where tasks are only created after data becomes available. This style of implementation is more oriented toward dataflow than control flow. The differences between the control flow and dataflow approaches are a matter of coding preference. However, there are some functional and performance distinctions between the two approaches.

Asynchronous pipelines are implemented using the transformer class and the call class. These classes are messaging blocks in the Asynchronous Agents Library. The transformer class and call class are queues that a producer puts data into; if there's currently no task processing the queue when data arrives, a new task is created to process the queue, and it’s active as long as there's incoming data to process. If it ever finds that there is no more data, the task goes away. If more data arrives, a new task starts. In other words, the transformer class or call class is a message buffer that acts like an agent but creates tasks as needed to process incoming data values instead of dedicating a thread to this purpose.

The transformer and call classes are message buffers that act like agents, but unlike agents they don’t require dedicated threads. Use transformer and call objects to implement asynchronous pipelines.

Asynchronous pipelines are useful in cases where there are many pipeline stages, and you don’t want to dedicate a thread to each stage. They are also efficient in cases where you expect the pipeline to often be empty (for example, while waiting for input). In these cases, transformer messaging blocks can improve application performance due to better utilization of threads.

A drawback to asynchronous pipelines is that the code can be slightly more difficult to write and debug than the agent-based style that was shown earlier in this chapter. The asynchronous style of pipelines may require the use of a separate task scheduler instance in order to keep scheduling latency low. Asynchronous pipelines are limited to pipeline stages that have an equal number of inputs and outputs. See Appendix A for more information about task schedulers.

Here is an example of an asynchronous pipeline from the ImageAgentPipelineDataFlow class.

void Initialize()
{
  m_scaler = unique_ptr<transformer<ImageInfoPtr, ImageInfoPtr>>(
                new transformer<ImageInfoPtr, ImageInfoPtr>(
                [this](ImageInfoPtr pInfo)->ImageInfoPtr
                {
                    this->ScaleImage(pInfo, m_imageDisplaySize);
                    return pInfo;
                }, 
                ...
            ));

  m_filterer = 
           unique_ptr<transformer<ImageInfoPtr, ImageInfoPtr>>(
                new transformer<ImageInfoPtr, ImageInfoPtr>(
                [this](ImageInfoPtr pInfo)->ImageInfoPtr
                {
                    this->FilterImage(pInfo, m_noiseLevel);
                    return pInfo;
                }, 
                ...
            ));

  m_displayer = unique_ptr<call<ImageInfoPtr>>(
                new call<ImageInfoPtr>(
                [this](ImageInfoPtr pInfo)
                {
                    this->DisplayImage(pInfo);                                          
                    m_governor.FreePipelineSlot();
                },
                ...
            ));

  m_scaler->link_target(m_filterer.get());
  m_filterer->link_target(m_displayer.get());
}

This code creates transformer objects that receive and send ImageInfoPtr objects. Each transformer declaration specifies a lambda function. The lambda function takes an image pointer as its argument, performs an operation on it, and returns the pointer to the modified image. A transformer has a one-to-one relationship between input and output messages. In other words, for each input value, the transformation function must return a single corresponding output value.

The final stage in the pipeline uses a call messaging block. Call messaging blocks are similar to transformers but have no output message. The m_displayer variable contains a lambda function that displays the image and updates the pipeline governor but does not produce any output.

You provide a function that performs a transformation on input values as an argument to the transformer class’s constructor. The transformation function is invoked by the system when inputs are available; therefore, you should be careful that all exceptions are handled within the transformation function.

The code creates transformer and call objects that correspond to all stages of the pipeline except the first. The transformer’s targets are configured by invoking the link_target method. You don’t need to set sources because transformer and call objects are themselves a kind of messaging block; they are their own data sources.

Set targets of the transformer and call objects using the link_target method.

The code shows the run method of the dataflow-based imaging pipeline.

Initialize();
vector<wstring> filenames = ...
                        
int sequence = kFirstImage;
for_each_infinite(filenames.cbegin(), filenames.cend(), 
  [this, offset, &sequence](wstring file)->bool
{
  ImageInfoPtr pInfo = this->LoadImage(sequence++, file, offset);
  if (nullptr == pInfo) 
    return true;
  m_governor.WaitForAvailablePipelineSlot();    
  asend(m_scaler.get(), pInfo);

  return IsCancellationPending();
});

// Allow subsequent stages to terminate
m_governor.WaitForEmptyPipeline();
done();

If you compare the code sample with the run method of the agent-based image pipeline that was described in the previous section, you can see similarities. In both, a sequential loop loads images and sends them to a messaging block that is the data source for the image scaling stage. In both, the number of in-flight elements is limited by a call to a governor object.

The difference between the two approaches is seen at run time. With the asynchronous pipeline, a new task is created whenever an empty transformer messaging block receives a new element (by means of the send or asend functions). This new task invokes the transformation function (that was passed to it as the first argument of the constructor), and then sends the return value of the transformation function to the messaging block that has been configured as the transformer’s target.

A call messaging block behaves like a transformer messaging block, except that no target is involved. The call block’s function is invoked on the new input element. It does not return a value.

At run time, transformer and call messaging blocks create tasks on demand to process any queued items. An active task is present only when there are elements to process.

Canceling a Pipeline

Pipeline tasks work together to perform their work; they must also work together when they respond to a cancellation.

A natural place to check for cancellation is at the end of the loop that processes items from the input source of a pipeline stage. In the image processing example, you’ll see that the ImagePipelineDlg class that controls the user interface provides an instance of the overwrite_buffer<bool> class. This object signals that cancellation has been requested from the user interface. Each stage of the pipeline periodically checks the value of the overwrite buffer to see if cancellation has been requested.

For example, the base class, AgentBase, which is used to implement the agents in the image processing example, includes the following definitions.

class AgentBase : public agent
{
  private:
    // ...
    ISource<bool>& m_cancellationSource;

  public:
    // ...
    AgentBase(HWND dialog, 
              ISource<bool>& cancellationSource, ...) : 
        m_dialogWindow(dialog), 
        m_cancellationSource(cancellationSource), 
        ...
    {
      // ...
    }

    bool IsCancellationPending() const 
    { 
        return ... || receive(m_cancellationSource); 
    }

    // ...
}

This code shows how the external environment (in this case, a request from the application’s user interface) can signal that the pipeline should cancel processing. The agent’s constructor includes a parameter that takes an ISource<bool> object as a cancellation source. The cancellation source is implemented as an instance of the overwrite_buffer<bool> class. Its value is false unless the user requests cancellation, and then the value of the cancellation source becomes true. Individual pipeline operations invoke the IsCancellationPending() method to effect an orderly shutdown.

How you implement cancellation can affect the performance of your application. You should be careful not to check for cancellation within tight loops, but you should also check for cancellation often enough to keep cancellation latency from becoming noticeable to the user. The Image Pipeline checks for cancellation at the beginning of each pipeline step which, depending on the speed of your computer, corresponds to one check every few hundred milliseconds for each agent thread. Profiling your application can help you determine if polling for cancellation is harming performance.

Handling Pipeline Exceptions

Exceptions are similar to cancellations. The difference between the two is that when an exception occurs within one of the pipeline stages, the tasks that execute the other stages don't by default receive notification that an exception has occurred elsewhere. Without such notification, there are several ways for the application to deadlock.

Note

When there is an exception in one pipeline stage, you should cancel the other stages. If you don't do this, deadlock can occur. Follow the guidelines in this section carefully.

The base class, AgentBase, in the image processing example uses an instance of the overwrite_buffer<bool> class to alert all pipeline stages when an exception in one stage has occurred. This is shown in the following code.

class AgentBase : public agent
{
  private:
    // ...
    mutable overwrite_buffer<bool> m_shutdownPending;

  public:
    // ...
    AgentBase(...) ...
    {
      send(m_shutdownPending, false);
    }

    void ShutdownOnError(Phases phase, const wstring& filePath,
                         const exception& e) const
    {
      wostringstream message;
      message << e.what();
      SendError(phase, filePath, message.str());
    }

    void SendError(Phases phase, const wstring& filePath,
                   wstring message) const
    {
      // ...
      send(m_shutdownPending, true);
      send(m_errorTarget, ErrorInfo(phase, filePath, message));
      PostMessageW(m_dialogWindow, WM_REPORTERROR, 0, 0);
    }

    bool IsCancellationPending() const 
    { 
        return receive(m_shutdownPending) ||  
               receive(m_cancellationSource); 
    }

    // ...
}

The stages of the pipeline invoke the application's ShutdownOnError method if they catch an exception. Because the pipeline stages run concurrently, the shutdown method is coded in a concurrency-safe manner. It sends values to buffers instead of updating shared variables directly.

The ShutdownOnError method sends the value true to the overwrite buffer m_shutdownPending to signal the other pipeline agents of the imminent shutdown. Next, the method sends a message that contains the error information to the unbounded buffer m_errorTarget. Finally, it sends a custom Windows message, WM_REPORTERROR, to notify the UI that an error needs to be processed. When the UI thread handles the Windows message, it invokes an application callback method that gets information from the m_errorTarget buffer and displays it in a dialog box. The information contains a text description of the exception and the name of the image file that was being processed when the exception occurred.

The IsCancellationPending method checks for two conditions: whether shutdown is pending due to an exception, or whether there is a user-initiated cancellation request. Two separate buffers are used because the implementation does not reuse the signaling mechanism provided for external cancellation. The pipeline stages can signal that an exception has occurred, but only the user can request cancellation of the operation. The reason is one of scope: operations other than the pipeline might be affected by a cancellation request. Error handling is intended to be local to the pipeline itself.

Load Balancing Using Multiple Producers

The unbounded_buffer<T> class allows you to read values from more than one producer. This feature can be used to implement load balancing for pipeline stages that take longer than other stages.

The image pipeline example described earlier in this chapter requires that the slideshow of thumbnail images be performed in the same order as the input files. This is a constraint that's common to many pipeline scenarios, such as processing a series of video frames. However, in the case of the image pipeline example, the filter operations on successive images are independent of each other. In this case, you can insert an additional pipeline task. This is shown in Figure 6.

It is sometimes possible to implement load balancing by increasing the number of tasks used for a particular pipeline stage.

Gg663538.E4EA179DA2939C1F18EF649F44D9B039(en-us,PandP.10).png

Figure 6

Consuming values from multiple producers

Figure 6 shows what happens when you add an additional filter task. The numbers in the figure represent the sequence numbers of the images being processed. (Recall that the images must be processed in order in this example.) Both of the filter tasks take images produced by the previous stage of the pipeline. The order in which they consume these images is not fully determined, although from a filter's local point of view, no input image ever arrives out of order.

Each of the filter stages has its own target buffer to hold the elements that it produces. The consumer of these queues is a component known as a multiplexer, which combines the inputs from all of the producers. The multiplexer provided in the sample code allows its consumer, which in this case is the display stage of the pipeline, to receive the images in the correct sequential order. The images don't need to be sorted or reordered. Instead, the fact that each producer queue is locally ordered allows the multiplexer to look for the next value in the sequence by simultaneously monitoring the heads of all of the producer queues.

Here's an example to make this more concrete. Suppose that each image has a unique sequence number that's available by invoking a data accessor method. The image numbers start with 1 and increase sequentially. As Figure 6 shows, the first filter might process images that are numbered 1, 4, and 5, while the second filter processes images with sequence numbers 2, 3, 6, and 7. Each load-balanced filter stage collects its output images into its own queue. The two output queues are correctly ordered (that is, no higher numbered image comes before a lower numbered image), but there are gaps in the sequence of numbers. For example, if you take values from the first filter's output queue, you get image 1, followed by image 4, followed by image 5. Images 2 and 3 are missing because they're found in the second filter's output queue.

The gaps are a problem. The next stage of the pipeline, the Display Image stage, needs to show images in order and without gaps in the sequence. This is where the multiplexer comes in. The multiplexer waits for input from both of the filter stage producer queues. When an image arrives, the multiplexer looks to see if the image's sequence number is the next in the expected sequence. If it is, the multiplexer passes it to the Display Image stage. If the image is not the next in the sequence, the multiplexer holds the value in an internal look-ahead buffer and repeats the take operation for the input queue that does not have a look-ahead value. This algorithm allows the multiplexer to put together the inputs from the incoming producer queues in a way that ensures sequential order without sorting the values.

Figure 7 shows the performance benefit of doubling the number of filter stages when the filter operation is twice as expensive as the other pipeline stages.

Gg663538.ABEBC94E40C4C16E63731F47BAD55DFA(en-us,PandP.10).png

Figure 7

Image pipeline with load balancing

If all pipeline stages, except the filter stage, take T units of time to process an image, and the filter stage takes 2 x T units of time, using two filter stages and two producer queues to load balance the pipeline results in an overall speed of approximately T units of time per image as the number of images grows. If you run the ImagePipeline sample and select the Load Balanced radio button, you'll see this effect. The speed of the pipeline (after a suitable number of images are processed) will converge on the average time of the slowest single-instance stage or on one-half of the average filter time, whichever is greater.

The queue wait time of Queue 3, which is displayed on the ImagePipeline sample's UI, indicates the overhead that's introduced by waiting on multiple producer queues. This is an example of how adding overhead to a parallel computation can actually increase the overall speed if the change also allows more efficient use of the available cores.

Pipelines and Streams

You may have noticed that message buffers and streams have some similarities. It's sometimes useful to treat a message buffer as a stream, and vice versa. For example, you may want to use a Pipeline pattern with library methods that read and write to streams. Suppose that you want to compress a file and then encrypt it. Both compression and encryption are supported by native libraries, but the functions’ parameter lists expect streams, not messaging blocks. It's possible to implement a stream whose underlying implementation relies on agents and messaging blocks.

Anti-Patterns

There are a few things to watch out for when implementing a pipeline.

Copying Large Amounts of Data between Pipeline Stages

If your data structures are large, you should pass pointers to data, and not the data itself, down the pipeline. Use the Resource Acquisition is Initialization (RAII) patterns to ensure correctness when using pointers. This is especially true for non-linear dataflow networks with multiple endpoints. Only pass small data items by value.

Pipeline Stages that Are Too Small

Don’t pass very small items of work. The overhead of managing the pipeline will override the gains from parallelism.

Forgetting to Use Message Passing for Isolation

Don’t use shared data structures, such as locks and semaphores, to share data between agents. Instead, pass messages.

Infinite Waits

If a pipeline task catches an exception and terminates, it will no longer take values from its input messaging block. Depending on the logic of your pipeline, you may find that processing is blocked indefinitely. You can avoid this situation by using the technique that was described in the section, "Exception Handling," earlier in this chapter.

Unbounded Queue Growth

You should be careful to limit the number of elements that can be pending at one time in the pipeline’s buffers. Use the techniques described in the previous sections to enforce such a limit. Refer to the PipelineGovernor class in the online samples for an example of how to limit the number of in-flight items in a pipeline.

More Information

For more information about this guidance, see Best Practices in the Asynchronous Agents Library on MSDN at https://msdn.microsoft.com/en-us/library/ff601928.aspx.

Design Notes

When you use the Pipeline pattern to decompose a problem, you need to consider how many pipeline stages to use. This depends on the number of cores you expect to have available at run time as well as the nature of the application you are trying to implement. Unlike most of the other patterns in this book, the Pipeline pattern doesn't automatically scale with the number of cores. This is one of its limitations. (Of course, in some cases you can introduce additional parallelism within a pipeline stage itself.)

More stages work well unless the overhead of adding and removing elements from the buffers becomes significant. This is usually only a problem for stages that perform very small amounts of work.

To achieve a high degree of parallelism, you need to be careful that all the stages in the pipeline take approximately the same amount of time to perform their work. If they don't, the pipeline will be gated by the slowest component.

The number of in-flight elements in the pipeline is also important for overall performance. If you limit your pipelines to contain only very small numbers of data values, you may find that not all stages of your pipeline are fully occupied with work, especially if data elements take a variable amount of processing time. Allowing the pipeline buffers to hold more data elements accommodates the variability in processing time. The allowed number of in-flight data elements can also depend on the size of the objects being processed. You would probably want to use fewer entries if each element contained an object such as a large bitmapped image that required a great deal of memory.

In general, there should be enough buffering to absorb variability in the pipeline flow, but no more. Use the Visual Studio Concurrency Visualization view to understand the throughput characteristics of the pipeline and modify the pipeline capacity to minimize the amount of time each stage is blocked by I/O waits.

Related Patterns

The Pipeline pattern has much in common with the concepts of pipes and filters that are implemented in operating systems. Pipelines are also related to streaming concepts.

Pipelines are expressions of a general technique known as producer/consumer. The pipeline is composed of a series of producer/consumers, each one depending on the output of its predecessor.

Exercises

  1. Write your own pipeline by modifying the example shown in the first section of this chapter.
  2. Execute the code with the Concurrency Visualizer. View and interpret the results.

Further Reading

Multiplexing inputs from multiple producer queues is covered by Campbell. A description of the pipes and filters pattern used by command shells for operating systems is described by Buschmann.

Buschmann, F., R. Meunier, H. Rohnert, P. Sommerlad, and M. Stal. Pattern-Oriented Software Architecture Volume 1: A System of Patterns. Wiley, 1996.

Campbell, C., M. Veanes, J. Huo, and A. Petrenko. "Multiplexing of Partially Ordered Events." TestCom 2005, Springer Verlag, June 2005.
https://research.microsoft.com/apps/pubs/default.aspx?id=77808.

Next Topic | Previous Topic | Home

Last built: March 9, 2012