Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

PipeStream, a Memory Efficient and Thread-Safe Stream

4.72/5 (23 votes)
10 Nov 2008CPOL4 min read 1   2.6K  
PipeStream is a thread-safe read/write data stream for use between two threads in a single-producer/single-consumer type problem.

Image 1

Introduction

.NET provides a fairly user friendly threading mechanism, but lacks a built-in way to easily stream large amounts of data between threads. In .NET 2.0, the Stream.Synchronized method provides a partial solution to this problem by creating a thread-safe stream wrapper. This does not address the memory issues inherent in processing large amounts of data, since a MemoryStream does not dynamically resize as data is read. A classic way of solving this problem is to create a shared data structure, such as a byte array, and implement a locking mechanism to ensure thread safety. This takes time to implement, and can lead to illusive bugs.

The PipeStream solves these problems by abstracting a shared data structure into a Stream interface, making it easy to pipe data between threads.

Background

I built this while developing an audio file transcoder, specifically to convert audio books encoded in *.mp3 to *.aac (or *.m4b) for use on an iPod. Since *.m4b was designed with the intention of retaining its location between sessions, I decided that I would like to merge my existing multi-*.mp3 audio books into single *.m4b files. My first attempt involved a long shell command using several pipes, but this fell through when some of the MP3s were encoded with incompatible parameters. So, I began a second attempt by wrapping the FAAC encoder in a System.Diagnostics.Process, writing to it via the StandardInput stream and reading from the StandardOutput. While FAAC is able to encode straight from MP3, it doesn't have the ability to merge multiple input files into one output. This meant that I would need to create a concatenated stream of individual MP3s as input into FAAC. I decided an easy way to do this would be to decode MP3 into a raw Wave stream, using the wrapped LAME Process, and feed that stream into FAAC's input. This strategy was successful but involved trying to stream data between the two processes, which would optimally be run in separate threads. The MemoryStream could be wrapped in a thread safe shell, but I found that I would run out of memory for larger audio books. After experimenting with other solutions, I tried building this PipeStream, which I hope will be useful for folks with similar scenarios.

For those unfamiliar with pipes, it is simply a means of redirecting the output of one process to the input of another in the command line without using any intermediate data storage. I like to conceptualize it using a familiar idiom - videogames!

PacStream

Waka waka waka... ahem. So, the light-bike produces a series of power-cells and writes them to its standard output. These are captured and buffered inside the pipe for a waka-man to read via his standard input. The same idea applies to the PipeStream, it only replaces the processes with threads, power-cells with bytes of data, and waka-man with... just kidding, waka-man is the same in both. You get the idea.

PacStream

Using the Code

In general, use the PipeStream as you would any other stream in situations where large memory transfers between threads is needed. For (a trivial) example:

  • First, create the PipeStream in the spawning class:

    C#
    PipeStream mPipeStream; // the shared stream
     
    public void ReadWriteMultiThreadTests()
    {
        mPipeStream = new PipeStream();
    
        // create some threads to read and write data using PipeStream
        Thread readThread = new Thread(new ThreadStart(ReadThread));
        Thread writeThread = new Thread(new ThreadStart(WriteThread));
        readThread.Start();
        writeThread.Start();
    
        writeThread.Join();
        readThread.Join();
    }
  • Then, write to it in the producer thread...

    C#
    private void WriterThread()
    {
        string inputFile = File.ReadToEnd("myFile.txt");
        int writeSize = 1024;
        for (int i = 0; i < str.Length; i += writeSize)
        {
            // select a substring of characters from the input string
            string substring = str.Substring(i, 
                (i + writeSize < str.Length) ? writeSize : str.Length - i);
            sw.Write(substring.ToCharArray(), 0, substring.Length);
        }
    }
  • ... and finally, read the data from the PipeStream in the consumer thread:

    C#
    private void ReaderThread()
    {
        char[] buffer = new char[80];
        while (!sr.EndOfStream)
        {
            int readLength = sr.Read(buffer, 0, buffer.Length);
            // do something productive with buffer
        }
    }

I've extended the Stream interface with a few extra properties:

  • MaxBufferLength: Gets or sets the maximum number of bytes to store in the buffer.
  • BlockLastReadBuffer: Gets or sets a value indicating whether to block the last read method before the buffer is empty.

The second property is valuable in scenarios when writing multiple streams to one reader - the final read will not occur until the writers are finished and this property is set to false.

  • When true, Read() will block until it can fill the passed in buffer and count.
  • When false, Read() will not block, returning all the available buffer data.

Points of Interest

Note that the underlying data structure is a Queue<byte>, making it around an order of magnitude less efficient than a MemoryStream. The PipeStream is, therefore, most useful for CPU-bound processes, such as media encoding, but also for write-read-forget situations where transferring a substantial amount of data is required.

Related Articles

History

  • 2006-10-17 - Version 1.0
  • 2008-10-9 - Version 1.1 - Uses Monitor instead of Manual Reset events for more elegant synchronicity

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)