Click to Rate and Give Feedback
Related Articles
Here the author introduces SQL Server Data Services, which exposes its functionality over standard Web service interfaces.

By David Robinson (July 2008)
Here the author answers questions regarding the Entity Framework and provides an understanding of how and why it was developed.

By Elisa Flasko (July 2008)
Here we present techniques for programmatic and declarative data binding and display with Windows Presentation Foundation.

By Josh Smith (July 2008)
Systems that handle failure without losing data are elusive. Learn how to achieve systems that are both scalable and robust.

By Udi Dahan (July 2008)
More ...
Articles by this Author
In this month’s installment of .NET Matters, columnist Stephen Toub answers reader questions concerning asynchronous I/O .

By Stephen Toub (July 2008)
This month Stephen Toub discusses asynchronous stream processing.

By Stephen Toub (March 2008)
The author creates a managed wrapper to use the new IFileOperations interface in Windows Vista from managed code.

By Stephen Toub (December 2007)
Find out how to use finalizers as a way to warn developers who use your custom types when they are garbage collected without having been disposed of correctly.

By Stephen Toub (November 2007)
This month Stephen Toub discusses deadlocks that can occur when synchronizing threads.

By Stephen Toub (October 2007)
Stephen Toub and Shawn Farkas discuss creating an adapter that takes the functionality of RNGCryptoServiceProvider and adapts it to the interface of Random.

By Stephen Toub and Shawn Farkas (September 2007)
Stephen Toub gets nostalgic as he prepares to leave MSDN Magazine.

By Stephen Toub (August 2007)
Many developers who use the Microsoft .NET Framework think that application type is tied to the libraries that can be used in that application. Stephen Toub clarifies.

By Stephen Toub (June 2007)
More ...
Popular Articles
We will introduce you to the Visual Studio Profiler by walking through a sample performance investigation, pinpointing code inefficiencies in some sample applications.

By Hari Pulapaka and Boris Vidolov (March 2008)
Here we present techniques for programmatic and declarative data binding and display with Windows Presentation Foundation.

By Josh Smith (July 2008)
Howard Dierking talks to the inventor of C++, Bjarne Stroustrup, about language zealots, the evolution of programming, and what’s in the future of programming.

By Howard Dierking (April 2008)
Here we present a rundown of the various language paradigms of CLR-based languages via short language introductions and code samples.

By Joel Pobar (May 2008)
More ...
Read the Blog
Windows Presentation Foundation (WPF) offers excellent support for managing the display and editing of complex data. In the December 2007 edition of MSDN Magazine, John Papa did a great job of explaining essential WPF data binding concepts. ...
Read more!
The most fundamental form of Web testing is HTTP request/response testing. This involves programmatically sending an HTTP request to the Web application, fetching the HTTP response, and examining the response for an expected value. In the May 2008 issue of MSDN Magazine, Read more!
In the November issue of MSDN Magazine, Jeffrey Richter demonstrates some recent additions to the C# programming language that make working with the APM significantly easier. In the June ...
Read more!
The July 2008 issue of MSDN Magazine is now available online. Here's what's in the issue: Data Services: Develop ...
Read more!
The June 2008 issue features the first installment of a new MSDN Magazine column on software design fundamentals. We’ll discuss design patterns and principles in a manner that isn't bound to a specific tool or lifecycle methodology. In this issue, Jeremy Miller starts the Patterns in Practice column ...
Read more!
In the April 2008 issue of MSDN Magazine, Kenny Kerr introduced the Windows Imaging Component (WIC), showing you how you can use it to encode and decode different image ...
Read more!
More ...
.NET Matters
Stream Pipeline
Stephen Toub


Q In my application, I'm encrypting and compressing quite a bit of data. As these are computationally intensive operations, I was expecting to see 100 percent CPU utilization in Task Manager, but I noticed that on my dual-core machine, it's topping out at around 50 percent. I'm assuming this is because only one core is being used, which is a shame given that the process takes a non-trivial amount of time to run. Is there any way I can get this encryption and compression process to use both processors? I'm using CryptoStream and GZipStream from the Microsoft® .NET Framework.
Q In my application, I'm encrypting and compressing quite a bit of data. As these are computationally intensive operations, I was expecting to see 100 percent CPU utilization in Task Manager, but I noticed that on my dual-core machine, it's topping out at around 50 percent. I'm assuming this is because only one core is being used, which is a shame given that the process takes a non-trivial amount of time to run. Is there any way I can get this encryption and compression process to use both processors? I'm using CryptoStream and GZipStream from the Microsoft® .NET Framework.
A First, just to make sure, you're doing the compression operation before the encryption operation, correct? If not, change it if you can. Good encryption will generate relatively uncompressible data. If you switch the order of the operations so that you first compress and then encrypt, not only should you end up with a smaller file, but the encryption will most likely take less time as it'll be operating on less data. As an example of this, I downloaded the text to War and Peace from the Gutenberg Project (www.gutenberg.org) and ran it through the two orderings. Encrypting (with RijndaelManaged and the default key size) and then compressing resulted in a data stream 250 percent larger than the one generated by compressing and then encrypting, and it took 50 percent longer to execute.
A First, just to make sure, you're doing the compression operation before the encryption operation, correct? If not, change it if you can. Good encryption will generate relatively uncompressible data. If you switch the order of the operations so that you first compress and then encrypt, not only should you end up with a smaller file, but the encryption will most likely take less time as it'll be operating on less data. As an example of this, I downloaded the text to War and Peace from the Gutenberg Project (www.gutenberg.org) and ran it through the two orderings. Encrypting (with RijndaelManaged and the default key size) and then compressing resulted in a data stream 250 percent larger than the one generated by compressing and then encrypting, and it took 50 percent longer to execute.
Now, to answer your actual question: yes, there are several approaches you could take here. The first would be to parallelize the actual compression and encryption operations. You probably don't want to (and shouldn't) re-implement the functionality in GZipStream and CryptoStream, so, until the .NET Framework team parallelizes them for you, you'll want an alternate solution.
If you don't care about the actual output format (for example, you need compression, but you don't care that it actually adheres to the gzip standard), you could chunk your input and process each chunk in parallel. For example, on your dual-core machine, you could divide in half the input byte array being passed to your GZipStream currently and, instead, process one half with one GZipStream and the other half with another GZipStream. You can then save these out to your output file one after the other.
Note, though, that you'll probably need to include some header information around your output so that your decompression process will adequately be able to determine where one GZipStream ends and the next begins. GZipStream currently buffers data as it reads from the input stream, so it may end up consuming more data from the input stream than it actually needs, which means you'd need to reset the position in the input stream after the first GZipStream completed its decompression.
One of the nice things about the chunking approach is that you can get the process to scale relatively well to more than two cores, as you can simply create as many chunks as are necessary to saturate your processors. But with only two cores, and with two operations (compression and encryption), a more practical solution for you at this point would probably be to create a parallel stream pipeline. You can do one operation on one processor at the same time that the other operation is working on the other processor. Now, obviously they can't be working on the same data at the same time, as you'd end up with two outputs (one compressed and one encrypted) that would be relatively useless to you given the problem you're trying to solve (one output both compressed and encrypted). Instead, however, you can mimic what you're probably doing today, where using the decorator pattern you pass the output from one stream as the input to the next (shown in Figure 1):
Figure 1 Passing Data through Streams 
using (CryptoStream encrypt = new CryptoStream(
        output, transform, CryptoStreamMode.Write))
using (GZipStream compress = new GZipStream(
        encrypt, CompressionMode.Compress, true))
    CopyStream(input, compress);
...
static void CopyStream(Stream input, Stream output){
    byte[] buffer = new byte[0x1000];
    int read;
    while ((read = input.Read(buffer, 0, buffer.Length)) > 0) 
        output.Write(buffer, 0, read);
}
Here, the CopyStream method is copying from the input stream into the compression stream. When the compression stream has compressed a buffer of data, it in turn writes that data out as the input into the encryption stream. And similarly, when the encryption stream has completed a buffer of data, it in turn writes that out to the output stream. This is known as a pipeline.
Parallelizing a pipeline is a very natural concept that manifests frequently in the "real world." Consider a group of people sending out invitation letters. One person is in charge of folding the letters to fit in an envelope, one person is in charge of putting the folded letters into an envelope, and one person is in charge of sealing and stamping the envelope. For the first letter, the only person doing anything is the person folding the letter, and the rest of the people are idle. But then the first person hands off the folded letter to the envelope stuffer. At that point, the first person is busy folding the next letter, and the second person is busy stuffing an envelope, but the third person is still idle. When the first and second people complete their tasks, the stuffed envelope is handed to the third person, who seals and stamps it, the folded letter is handed to the second person, who stuffs it, and the first person begins folding a new letter. From this point on, until there are only two more letters left, all three people will be fully utilized (fully utilized, that is, except for the time it takes to hand off the letters and envelopes between people). So, even though each person is only doing one task at a time, assuming there are enough letters to be sent, the majority of the time we could theoretically be getting three times as much work done by having three people involved in the task.
We'll follow the exact same process for our stream pipelining task. Instead of passing around folded letters or envelopes, of course, we'll be passing around buffers of data, since that is the mechanism streams use to communicate. However, unlike with our sequential version, where one stream wrote directly to another stream, direct synchronous communication is no longer possible due to each of those streams operating in parallel on separate threads. We need another mechanism to re-enable that communication. For that task, I've written a BlockingStream, shown in Figure 2.
The concept behind BlockingStream is a bit different than most other streams in the .NET Framework. As with other streams, it derives from System.IO.Stream and overrides all of its abstract methods and properties. However, when it comes to position and thread safety, it exhibits behavior that's a bit unorthodox. Most streams in the .NET Framework are not thread safe, meaning that multiple threads can't safely access an instance of the stream concurrently and most streams maintain a single position at which the next read or write will occur. BlockingStream, on the other hand, is thread safe, and, in a sense, it implicitly maintains two positions, though neither is exposed as a numerical value to the user of the type.
BlockingStream works by maintaining an internal queue of data buffers written to it. When data is written to the stream, the buffer written is enqueued. When data is read from the stream, a buffer is dequeued in a first-in-first-out (FIFO) order, and the data in it is handed back to the caller. In that sense, there is a position in the stream at which the next write will occur and a position at which the next read will occur.
Note, however, that every read request doesn't necessarily modify the queue. If a reader requests an amount of data equal to or greater than the size of the next buffer in the queue, then the read request will be satisfied with that one buffer (Read can successfully return even if it returns less data than was requested). But if the user requests less than the size of the next buffer, the remaining data from the buffer is stored separately (not in the queue) so that it can be used to satisfy the next Read request (see _currentChunk in Figure 2). If the next Read request can be satisfied entirely with the remainder form the previous Read request, the queue will not be modified on that request.
BlockingStream uses a ManualResetEvent to signal to consumers whether data is available. If a thread tries to read data from the BlockingStream and there is no data available to read, the thread will block on this unset ManualResetEvent until data has been written. When data is written, the writer sets the event to wake up a consumer and notify it that data is available for reading. In fact, if there are multiple readers waiting for data, they will all wake up, which requires some additional logic in the Read method.
The Read method is implemented as one big loop that will continue to loop until either there is data available to fulfill the read request or until a writer has called SetEndOfStream, which informs the stream that there will be no more data; in that case, it would be silly to continue to block readers waiting for data, as they'd be waiting for a very long time. Each of these conditions is represented by a ManualResetEvent, the one mentioned earlier and another one that represents whether writing is complete. The first thing Read does once it enters the loop is to wait for either of these events to be set (using WaitHandle.WaitAny).
BlockingStream is useful for our stream pipeline because it enables the same stream-to-stream pattern used implicitly in the sequential version, but now supporting cross-thread implementations. The idea is that I can write methods that represent each of the Stream-based operations I need performed, as shown in Figure 3.
Each of these methods can be executed on separate threads, with the output from one piped as the input to the other as a BlockingStream. Initially, the second operation will be blocked waiting on its input BlockingStream. As soon as the first operation writes data to the stream, the second operation will wake up, retrieve that data, and process it. I've coded this functionality into the StreamPipeline class, shown in Figure 4.
StreamPipeline is very simple to use. The constructor accepts a params array of delegates, each of which expects an input Stream and an output Stream; these are called filters. It then creates and stores one BlockingStream to use between each pair of filters. Notice that the Compress and Encrypt methods I showed previously conform to the necessary delegate signature, so they can be used as filters (shown in Figure 5).
Figure 5 StreamPipeline in Action (Click the image for a larger view)
The Run method accepts the input and output Stream that should be at the beginning and at the end of the pipeline. In our example, this might be a FileStream representing the data on disk to be compressed and encrypted, and an output FileStream to store the resulting data back to disk:
using (FileStream input = File.OpenRead("inputData.bin"))
using (FileStream output = File.OpenWrite("outputData.bin"))
using (StreamPipeline pipeline = new StreamPipeline(Compress, Encrypt))
{
    pipeline.Run(input, output);
}
Internally, for each stage in the pipeline, the Run method generates a delegate that runs the appropriate filter provided with the proper input and output streams. It creates new threads to run each of the work items, except for the last one, which it runs on the thread invoking Run. The current thread would need to block for the work to complete anyway, so it makes sense to reuse that thread to complete some of the processing rather than waste its resources.
Note my decision not to use the ThreadPool in this case. Normally, my first instinct is to queue work items into the ThreadPool rather than starting new Threads. In this case, however, the work I'm queueing can block waiting on other queued work items to perform some operations. That has the potential to possibly deadlock (see msdn.microsoft.com/msdnmag/issues/04/12/NETMatters for an example of where developers with the .NET Framework 1.x frequently ran into this), and as such I avoid the situation by creating custom threads for each operation.
That decision, of course, has its own drawbacks. For example, threads are expensive to create and tear down, so if the filters are short lived, the overhead here could dominate other aspects of the computation. If you were going to run a bunch of different input and output streams through the same StreamPipeline, it might be advantageous to rewrite the relevant portions of the class to keep the Threads alive and to reuse them from run to run. Given the constraints of your own applications, using ThreadPool might also be more appropriate.
Now that we've built our StreamPipeline, the key question is, how does it perform? After all, the whole point of this exercise was to utilize both cores on the machine in order to improve computation time. In my tests on my dual-core machine, using the same War and Peace data mentioned previously, this sequential pipeline implementation ran 20-25 percent faster than the sequential version. Disappointed? A question I hope would come into your mind at this point is, why didn't it run 100 percent faster? Is there something wrong with this implementation? After all, it's executing on two processors now, so shouldn't it run twice as fast?
I mislead a bit with my stuffing envelopes example from the beginning of this column. I don't know about you, but it takes me longer to fold a letter in thirds than it takes me to stuff that folded letter into an envelope. Given that, in our human pipeline, it's likely that the second person doing all the envelope stuffing will sit idly after stuffing each envelope, waiting for the next folded letter to arrive from his predecessor in the pipeline. So while we did throw more people at the problem, the problem didn't scale linearly to the number of people involved.
The same is true for our compression and encryption example. I timed how much time in the sequential implementation was spent doing the compression and how much was spent doing the encryption. Any guesses? Only 20 percent of the time was spent doing the encryption. Thus, our pipeline implementation is achieving very close to the maximum speedup possible in this situation.
That's not to say this implementation is the best it could be. There are several possible performance issues with it that could become more apparent as the number of processors increased, the most obvious of which is that you'd need at least one filter per processor to even have the opportunity of leveraging all processing power in the machine. Other possible issues include the fact that we're taking multiple locks within the BlockingStream implementation (one of which isn't necessary if there will only ever be one reader using the instance at a time, which is the case with the StreamPipeline's usage of BlockingStream). But for the relatively small amount of code we had to write, even a 20-25 percent performance gain like the one demonstrated here with compression and encryption can be quite valuable.
Also note that pipelines can be very effective when stages can themselves process multiple pieces of data in parallel. For example, in our envelope stuffing example, if multiple people were folding envelopes (or if the second person helped the first person fold when he didn't have any stuffing to do), the throughput of the entire system would increase. For the compression/encryption example, this would be akin to the chunking approach discussed earlier, such that the chunks are compressed in parallel in that stage of the pipeline, then passed on to the encryption stage which also encrypts the compressed chunks in parallel, and so on. The more fine-grain concurrency introduced into the application, the better the chance that all processors in the machine will be able to lend a hand in getting to the solution faster, especially as technology moves forward and we find ourselves with more and more processors in the average personal computer.

Send your questions and comments for Stephen to netqa@microsoft.com.


Stephen Toub is a Senior Program Manager on the Parallel Computing Platform team at Microsoft. He is also a Contributing Editor for MSDN Magazine.

Page view tracker