.NET Matters

Stream Pipeline

Stephen Toub

Code download available at: StreamPipeline.zip(180 KB)

Q In my application, I'm encrypting and compressing quite a bit of data. As these are computationally intensive operations, I was expecting to see 100 percent CPU utilization in Task Manager, but I noticed that on my dual-core machine, it's topping out at around 50 percent. I'm assuming this is because only one core is being used, which is a shame given that the process takes a non-trivial amount of time to run. Is there any way I can get this encryption and compression process to use both processors? I'm using CryptoStream and GZipStream from the Microsoft® .NET Framework.

Q In my application, I'm encrypting and compressing quite a bit of data. As these are computationally intensive operations, I was expecting to see 100 percent CPU utilization in Task Manager, but I noticed that on my dual-core machine, it's topping out at around 50 percent. I'm assuming this is because only one core is being used, which is a shame given that the process takes a non-trivial amount of time to run. Is there any way I can get this encryption and compression process to use both processors? I'm using CryptoStream and GZipStream from the Microsoft® .NET Framework.

A First, just to make sure, you're doing the compression operation before the encryption operation, correct? If not, change it if you can. Good encryption will generate relatively uncompressible data. If you switch the order of the operations so that you first compress and then encrypt, not only should you end up with a smaller file, but the encryption will most likely take less time as it'll be operating on less data. As an example of this, I downloaded the text to War and Peace from the Gutenberg Project (www.gutenberg.org) and ran it through the two orderings. Encrypting (with RijndaelManaged and the default key size) and then compressing resulted in a data stream 250 percent larger than the one generated by compressing and then encrypting, and it took 50 percent longer to execute.

A First, just to make sure, you're doing the compression operation before the encryption operation, correct? If not, change it if you can. Good encryption will generate relatively uncompressible data. If you switch the order of the operations so that you first compress and then encrypt, not only should you end up with a smaller file, but the encryption will most likely take less time as it'll be operating on less data. As an example of this, I downloaded the text to War and Peace from the Gutenberg Project (www.gutenberg.org) and ran it through the two orderings. Encrypting (with RijndaelManaged and the default key size) and then compressing resulted in a data stream 250 percent larger than the one generated by compressing and then encrypting, and it took 50 percent longer to execute.

Now, to answer your actual question: yes, there are several approaches you could take here. The first would be to parallelize the actual compression and encryption operations. You probably don't want to (and shouldn't) re-implement the functionality in GZipStream and CryptoStream, so, until the .NET Framework team parallelizes them for you, you'll want an alternate solution.

If you don't care about the actual output format (for example, you need compression, but you don't care that it actually adheres to the gzip standard), you could chunk your input and process each chunk in parallel. For example, on your dual-core machine, you could divide in half the input byte array being passed to your GZipStream currently and, instead, process one half with one GZipStream and the other half with another GZipStream. You can then save these out to your output file one after the other.

Note, though, that you'll probably need to include some header information around your output so that your decompression process will adequately be able to determine where one GZipStream ends and the next begins. GZipStream currently buffers data as it reads from the input stream, so it may end up consuming more data from the input stream than it actually needs, which means you'd need to reset the position in the input stream after the first GZipStream completed its decompression.

One of the nice things about the chunking approach is that you can get the process to scale relatively well to more than two cores, as you can simply create as many chunks as are necessary to saturate your processors. But with only two cores, and with two operations (compression and encryption), a more practical solution for you at this point would probably be to create a parallel stream pipeline. You can do one operation on one processor at the same time that the other operation is working on the other processor. Now, obviously they can't be working on the same data at the same time, as you'd end up with two outputs (one compressed and one encrypted) that would be relatively useless to you given the problem you're trying to solve (one output both compressed and encrypted). Instead, however, you can mimic what you're probably doing today, where using the decorator pattern you pass the output from one stream as the input to the next (shown in Figure 1):

Figure 1 Passing Data through Streams

Figure 1** Passing Data through Streams **

using (CryptoStream encrypt = new CryptoStream(
        output, transform, CryptoStreamMode.Write))
using (GZipStream compress = new GZipStream(
        encrypt, CompressionMode.Compress, true))
    CopyStream(input, compress);
...
static void CopyStream(Stream input, Stream output){
    byte[] buffer = new byte[0x1000];
    int read;
    while ((read = input.Read(buffer, 0, buffer.Length)) > 0) 
        output.Write(buffer, 0, read);
}

Here, the CopyStream method is copying from the input stream into the compression stream. When the compression stream has compressed a buffer of data, it in turn writes that data out as the input into the encryption stream. And similarly, when the encryption stream has completed a buffer of data, it in turn writes that out to the output stream. This is known as a pipeline.

Parallelizing a pipeline is a very natural concept that manifests frequently in the "real world." Consider a group of people sending out invitation letters. One person is in charge of folding the letters to fit in an envelope, one person is in charge of putting the folded letters into an envelope, and one person is in charge of sealing and stamping the envelope. For the first letter, the only person doing anything is the person folding the letter, and the rest of the people are idle. But then the first person hands off the folded letter to the envelope stuffer. At that point, the first person is busy folding the next letter, and the second person is busy stuffing an envelope, but the third person is still idle. When the first and second people complete their tasks, the stuffed envelope is handed to the third person, who seals and stamps it, the folded letter is handed to the second person, who stuffs it, and the first person begins folding a new letter. From this point on, until there are only two more letters left, all three people will be fully utilized (fully utilized, that is, except for the time it takes to hand off the letters and envelopes between people). So, even though each person is only doing one task at a time, assuming there are enough letters to be sent, the majority of the time we could theoretically be getting three times as much work done by having three people involved in the task.

We'll follow the exact same process for our stream pipelining task. Instead of passing around folded letters or envelopes, of course, we'll be passing around buffers of data, since that is the mechanism streams use to communicate. However, unlike with our sequential version, where one stream wrote directly to another stream, direct synchronous communication is no longer possible due to each of those streams operating in parallel on separate threads. We need another mechanism to re-enable that communication. For that task, I've written a BlockingStream, shown in Figure 2.

Figure 2 BlockingStream Implementation

public class BlockingStream : Stream
{
    private object _lockForRead;
    private object _lockForAll;
    private Queue<byte[]> _chunks;
    private byte[] _currentChunk;
    private int _currentChunkPosition;
    private ManualResetEvent _doneWriting;
    private ManualResetEvent _dataAvailable;
    private WaitHandle[] _events;
    private int _doneWritingHandleIndex;
    private volatile bool _illegalToWrite;

    public BlockingStream()
    {
        _chunks = new Queue<byte[]>();
        _doneWriting = new ManualResetEvent(false);
        _dataAvailable = new ManualResetEvent(false);
        _events = new WaitHandle[] { _dataAvailable, _doneWriting };
        _doneWritingHandleIndex = 1;
        _lockForRead = new object();
        _lockForAll = new object();
    }

    public override bool CanRead { get { return true; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return !_illegalToWrite; } }

    public override void Flush() { }
    public override long Length { 
        get { throw new NotSupportedException(); } }
    public override long Position { 
        get { throw new NotSupportedException(); } 
        set { throw new NotSupportedException(); } }
    public override long Seek(long offset, SeekOrigin origin) { 
        throw new NotSupportedException(); }
    public override void SetLength(long value) { 
        throw new NotSupportedException(); }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (buffer == null) throw new ArgumentNullException("buffer");
        if (offset < 0 || offset >= buffer.Length) 
            throw new ArgumentOutOfRangeException("offset");
        if (count < 0 || offset + count > buffer.Length) 
            throw new ArgumentOutOfRangeException("count");
        if (_dataAvailable == null) 
            throw new ObjectDisposedException(GetType().Name);

        if (count == 0) return 0;

        while (true)
        {
            int handleIndex = WaitHandle.WaitAny(_events);
            lock (_lockForRead)
            {
                lock (_lockForAll)
                {
                    if (_currentChunk == null)
                    {
                        if (_chunks.Count == 0)
                        {
                            if (handleIndex == _doneWritingHandleIndex) 
                                return 0;
                            else continue;
                        }
                        _currentChunk = _chunks.Dequeue();
                        _currentChunkPosition = 0;
                    }
                }

                int bytesAvailable = 
                    _currentChunk.Length - _currentChunkPosition;
                int bytesToCopy;
                if (bytesAvailable > count)
                {
                    bytesToCopy = count;
                    Buffer.BlockCopy(_currentChunk, _currentChunkPosition, 
                        buffer, offset, count);
                    _currentChunkPosition += count;
                }
                else
                {
                    bytesToCopy = bytesAvailable;
                    Buffer.BlockCopy(_currentChunk, _currentChunkPosition, 
                        buffer, offset, bytesToCopy);
                    _currentChunk = null;
                    _currentChunkPosition = 0;
                    lock (_lockForAll)
                    {
                        if (_chunks.Count == 0) _dataAvailable.Reset();
                    }
                }
                return bytesToCopy;
            }
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (buffer == null) throw new ArgumentNullException("buffer");
        if (offset < 0 || offset >= buffer.Length) 
            throw new ArgumentOutOfRangeException("offset");
        if (count < 0 || offset + count > buffer.Length) 
            throw new ArgumentOutOfRangeException("count");
        if (_dataAvailable == null) 
            throw new ObjectDisposedException(GetType().Name);

        if (count == 0) return;

        byte[] chunk = new byte[count];
        Buffer.BlockCopy(buffer, offset, chunk, 0, count);
        lock (_lockForAll)
        {
            if (_illegalToWrite) 
                throw new InvalidOperationException(
                    "Writing has already been completed.");
            _chunks.Enqueue(chunk);
            _dataAvailable.Set();
        }
    }

    public void SetEndOfStream()
    {
        if (_dataAvailable == null) 
            throw new ObjectDisposedException(GetType().Name);
        lock (_lockForAll)
        {
            _illegalToWrite = true;
            _doneWriting.Set();
        }
    }

    public override void Close()
    {
        base.Close();
        if (_dataAvailable != null)
        {
            _dataAvailable.Close();
            _dataAvailable = null;
        }
        if (_doneWriting != null)
        {
            _doneWriting.Close();
            _doneWriting = null;
        }
    }
}

The concept behind BlockingStream is a bit different than most other streams in the .NET Framework. As with other streams, it derives from System.IO.Stream and overrides all of its abstract methods and properties. However, when it comes to position and thread safety, it exhibits behavior that's a bit unorthodox. Most streams in the .NET Framework are not thread safe, meaning that multiple threads can't safely access an instance of the stream concurrently and most streams maintain a single position at which the next read or write will occur. BlockingStream, on the other hand, is thread safe, and, in a sense, it implicitly maintains two positions, though neither is exposed as a numerical value to the user of the type.

BlockingStream works by maintaining an internal queue of data buffers written to it. When data is written to the stream, the buffer written is enqueued. When data is read from the stream, a buffer is dequeued in a first-in-first-out (FIFO) order, and the data in it is handed back to the caller. In that sense, there is a position in the stream at which the next write will occur and a position at which the next read will occur.

Note, however, that every read request doesn't necessarily modify the queue. If a reader requests an amount of data equal to or greater than the size of the next buffer in the queue, then the read request will be satisfied with that one buffer (Read can successfully return even if it returns less data than was requested). But if the user requests less than the size of the next buffer, the remaining data from the buffer is stored separately (not in the queue) so that it can be used to satisfy the next Read request (see _currentChunk in Figure 2). If the next Read request can be satisfied entirely with the remainder form the previous Read request, the queue will not be modified on that request.

BlockingStream uses a ManualResetEvent to signal to consumers whether data is available. If a thread tries to read data from the BlockingStream and there is no data available to read, the thread will block on this unset ManualResetEvent until data has been written. When data is written, the writer sets the event to wake up a consumer and notify it that data is available for reading. In fact, if there are multiple readers waiting for data, they will all wake up, which requires some additional logic in the Read method.

The Read method is implemented as one big loop that will continue to loop until either there is data available to fulfill the read request or until a writer has called SetEndOfStream, which informs the stream that there will be no more data; in that case, it would be silly to continue to block readers waiting for data, as they'd be waiting for a very long time. Each of these conditions is represented by a ManualResetEvent, the one mentioned earlier and another one that represents whether writing is complete. The first thing Read does once it enters the loop is to wait for either of these events to be set (using WaitHandle.WaitAny).

BlockingStream is useful for our stream pipeline because it enables the same stream-to-stream pattern used implicitly in the sequential version, but now supporting cross-thread implementations. The idea is that I can write methods that represent each of the Stream-based operations I need performed, as shown in Figure 3.

Figure 3 Methods for Stream-Based Operations

static void Compress(Stream input, Stream output){
    using (GZipStream compressor = new GZipStream(
           output, CompressionMode.Compress, true))
        CopyStream(input, compressor);
}

static void Encrypt(Stream input, Stream output) {
    RijndaelManaged rijndael = new RijndaelManaged();
    ... // setup crypto keys
    using (ICryptoTransform transform = rijndael.CreateEncryptor())
    using (CryptoStream encryptor = new CryptoStream(
            output, transform, CryptoStreamMode.Write))
        CopyStream(input, encryptor);
}

Each of these methods can be executed on separate threads, with the output from one piped as the input to the other as a BlockingStream. Initially, the second operation will be blocked waiting on its input BlockingStream. As soon as the first operation writes data to the stream, the second operation will wake up, retrieve that data, and process it. I've coded this functionality into the StreamPipeline class, shown in Figure 4.

Figure 4 StreamPipeline

public class StreamPipeline : IDisposable
{
    private Action<Stream, Stream>[] _filters;
    private List<BlockingStream> _blockingStreams;

    public StreamPipeline(params Action<Stream, Stream>[] filters)
    {
        if (filters == null) throw new ArgumentNullException("filters");
        if (filters.Length == 0 || Array.IndexOf(filters, null) >= 0) 
            throw new ArgumentException("filters");

        _filters = filters;

        _blockingStreams = new List<BlockingStream>(_filters.Length - 1);
        for (int i = 0; i < filters.Length-1; i++)
        {
            _blockingStreams.Add(new BlockingStream());
        }
    }

    public void Run(Stream input, Stream output)
    {
        if (_blockingStreams == null) 
            throw new ObjectDisposedException(GetType().Name);
        if (input == null) throw new ArgumentNullException("input");
        if (!input.CanRead) throw new ArgumentException("input");
        if (output == null) throw new ArgumentNullException("output");
        if (!output.CanWrite) throw new ArgumentException("output");

        ThreadStart lastStage = null;
        for (int i = 0; i < _filters.Length; i++)
        {
            Stream stageInput = i == 0 ? input : _blockingStreams[i - 1];
            Stream stageOutput = 
                i == _filters.Length - 1 ? output : _blockingStreams[i];
            Action<Stream, Stream> filter = _filters[i];
            ThreadStart stage = delegate
            {
                filter(stageInput, stageOutput);
                if (stageOutput is BlockingStream) 
                    ((BlockingStream)stageOutput).SetEndOfStream();
            };
            if (i < _filters.Length - 1)
            {
                Thread t = new Thread(stage);
                t.IsBackground = true;
                t.Start();
            }
            else lastStage = stage;
        }
        lastStage();
    }

    public void Dispose()
    {
        if (_blockingStreams != null)
        {
            foreach (BlockingStream stream in _blockingStreams) 
                stream.Dispose();
            _blockingStreams = null;
        }
    }
}

StreamPipeline is very simple to use. The constructor accepts a params array of delegates, each of which expects an input Stream and an output Stream; these are called filters. It then creates and stores one BlockingStream to use between each pair of filters. Notice that the Compress and Encrypt methods I showed previously conform to the necessary delegate signature, so they can be used as filters (shown in Figure 5).

Figure 5 StreamPipeline in Action

Figure 5** StreamPipeline in Action **(Click the image for a larger view)

The Run method accepts the input and output Stream that should be at the beginning and at the end of the pipeline. In our example, this might be a FileStream representing the data on disk to be compressed and encrypted, and an output FileStream to store the resulting data back to disk:

using (FileStream input = File.OpenRead("inputData.bin"))
using (FileStream output = File.OpenWrite("outputData.bin"))
using (StreamPipeline pipeline = new StreamPipeline(Compress, Encrypt))
{
    pipeline.Run(input, output);
}

Internally, for each stage in the pipeline, the Run method generates a delegate that runs the appropriate filter provided with the proper input and output streams. It creates new threads to run each of the work items, except for the last one, which it runs on the thread invoking Run. The current thread would need to block for the work to complete anyway, so it makes sense to reuse that thread to complete some of the processing rather than waste its resources.

Note my decision not to use the ThreadPool in this case. Normally, my first instinct is to queue work items into the ThreadPool rather than starting new Threads. In this case, however, the work I'm queueing can block waiting on other queued work items to perform some operations. That has the potential to possibly deadlock (see msdn.microsoft.com/msdnmag/issues/04/12/NETMatters for an example of where developers with the .NET Framework 1.x frequently ran into this), and as such I avoid the situation by creating custom threads for each operation.

That decision, of course, has its own drawbacks. For example, threads are expensive to create and tear down, so if the filters are short lived, the overhead here could dominate other aspects of the computation. If you were going to run a bunch of different input and output streams through the same StreamPipeline, it might be advantageous to rewrite the relevant portions of the class to keep the Threads alive and to reuse them from run to run. Given the constraints of your own applications, using ThreadPool might also be more appropriate.

Now that we've built our StreamPipeline, the key question is, how does it perform? After all, the whole point of this exercise was to utilize both cores on the machine in order to improve computation time. In my tests on my dual-core machine, using the same War and Peace data mentioned previously, this sequential pipeline implementation ran 20-25 percent faster than the sequential version. Disappointed? A question I hope would come into your mind at this point is, why didn't it run 100 percent faster? Is there something wrong with this implementation? After all, it's executing on two processors now, so shouldn't it run twice as fast?

I mislead a bit with my stuffing envelopes example from the beginning of this column. I don't know about you, but it takes me longer to fold a letter in thirds than it takes me to stuff that folded letter into an envelope. Given that, in our human pipeline, it's likely that the second person doing all the envelope stuffing will sit idly after stuffing each envelope, waiting for the next folded letter to arrive from his predecessor in the pipeline. So while we did throw more people at the problem, the problem didn't scale linearly to the number of people involved.

The same is true for our compression and encryption example. I timed how much time in the sequential implementation was spent doing the compression and how much was spent doing the encryption. Any guesses? Only 20 percent of the time was spent doing the encryption. Thus, our pipeline implementation is achieving very close to the maximum speedup possible in this situation.

That's not to say this implementation is the best it could be. There are several possible performance issues with it that could become more apparent as the number of processors increased, the most obvious of which is that you'd need at least one filter per processor to even have the opportunity of leveraging all processing power in the machine. Other possible issues include the fact that we're taking multiple locks within the BlockingStream implementation (one of which isn't necessary if there will only ever be one reader using the instance at a time, which is the case with the StreamPipeline's usage of BlockingStream). But for the relatively small amount of code we had to write, even a 20-25 percent performance gain like the one demonstrated here with compression and encryption can be quite valuable.

Also note that pipelines can be very effective when stages can themselves process multiple pieces of data in parallel. For example, in our envelope stuffing example, if multiple people were folding envelopes (or if the second person helped the first person fold when he didn't have any stuffing to do), the throughput of the entire system would increase. For the compression/encryption example, this would be akin to the chunking approach discussed earlier, such that the chunks are compressed in parallel in that stage of the pipeline, then passed on to the encryption stage which also encrypts the compressed chunks in parallel, and so on. The more fine-grain concurrency introduced into the application, the better the chance that all processors in the machine will be able to lend a hand in getting to the solution faster, especially as technology moves forward and we find ourselves with more and more processors in the average personal computer.

Send your questions and comments for Stephen to netqa@microsoft.com.

Stephen Toub is a Senior Program Manager on the Parallel Computing Platform team at Microsoft. He is also a Contributing Editor for MSDN Magazine.