Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Simple Pipeline Implementation in C#

4.54/5 (11 votes)
17 Nov 2014CPOL4 min read 73.1K   844  
A single threaded, sequential implementation for the pipes and filters pattern in C#

Introduction

Pipes and filters is a very famous design and architectural pattern. Wikipedia has a detailed explanation of what is a software pipeline. I was searching for a simple implementation that can be used in a training session, but most of the implementations available in the internet were advanced with multi threading and complex input/output. I wrote this implementation during a lunch session and decided to publish it here.

Background

If you want to understand this pattern from a real world example, just imagine the assembly line of a car. At one end, the auto parts are fed into the assembly line and then at each stage, a human/robot performs some operation. The output of each stage is the input to the next stage. After all the operations are completed, a car is delivered at the other end. Each processing element is responsible for a particular task and assumes only about its input and output.

In software you would have already used this when you wrote command like:

c:\>dir | more

Here, you are passing the output of "dir" command as the input to the "more" command. You may wonder why can't the "dir" command provide pagination on its own? Why have "more" command? But in that case, you cannot do something like:

c:\>type file.txt | more

Here, the entire file's content is written to the "more" commands input and you can view the file page by page. If "more" is not available, then each command has to implement the pagination logic again. This is a waste of effort and introduces lot of redundancy. The pipes and filters pattern solves this problem by breaking the solution into independent steps. Each solution step only assumes about the input and output. Until this assumption is met, a user can connect the solution steps in different fashion and solve complex problems.

Before jumping into the code, let us make the terminology clear. A pipeline is a solution constructed by connecting a set of pipeline elements. A pipeline element is a solution step that takes a specific input, processes the data and produces a specific output.

As always, when learning a concept, start with a simple example. I am going to construct a pipeline based on passive pipeline elements with single input/output.

Using the Code

IPipelineElement

All the pipeline elements implement this interface. Using this interface, a pipeline element can be connected to other pipeline element, checked for completion and process the input content.

Each pipeline element needs to read/write from a common data structure for its input/output. I used the PipeStream implementation by . But there are other alternatives like blocking collections available as a part of .NET 4.

C#
/// <summary>
/// Interface for a pipeline element. 
/// </summary>
interface IPipelineElement
{
    /// <summary>
    /// Set the input for this pipeline element
    /// </summary>
    /// <param name="inputStream">The stream</param>
    void SetInput(PipeStream inputStream);
    
    /// <summary>
    /// The output of this element will be connected to the input of the next element.
    /// </summary>
    /// <param name="next">The next element to be connected</param>
    void Connect(IPipelineElement next);
    
    /// <summary>
    /// The pipeline elements processing function. Implement your processing here.
    /// </summary>
    void Process();
    
    /// <summary>
    /// Is the processing complete with respect to this particular pipeline element.
    /// </summary>
    bool IsComplete
    {
        get;
    }
}

Pipeline

This pipeline class implements a sequential pipeline. All the pipeline elements are passive with a single input/output.

C#
/// <summary>
/// This pipeline class implements a sequential pipeline. All the pipeline elements are
/// passive. The pipeline elements don't have a processing thread of their own.
/// </summary>
class Pipeline
{
    /// <summary>
    /// List of pipeline elements
    /// </summary>
    List<IPipelineElement> pipeline = new List<IPipelineElement>(); 
    
    /// <summary>
    /// Adds the element to the pipeline and links them.
    /// </summary>
    /// <param name="anElement">The element to be added.</param>
    public void Add(IPipelineElement anElement)
    {
        pipeline.Add(anElement);
        if (pipeline.Count > 1)
            pipeline[pipeline.Count - 2].Connect(pipeline[pipeline.Count - 1]);
    }
    
    /// <summary>
    /// This is the main processing method. It runs the pipeline until all the 
    /// elements declare completion.
    /// </summary>
    public void Run()
    {
        bool jobCompleted = false;
        
        // Run the pipeline until the job is not completed
        while (!jobCompleted)
        {
            jobCompleted = true;
            for (int i = 0; i < pipeline.Count; i++)
            {
                pipeline[i].Process();
                jobCompleted = jobCompleted && pipeline[i].IsComplete;
            }
        }
    }
}

I have implemented 3 pipeline elements. The code is straight forward so I am just listing the usage.

  1. FileReader - This element reads the content of the file and writes that to the output. The file path is provided as a command line argument.
  2. LineCounter - This element reads the content of the input, counts the number of lines and dumps that to the output in the end.
  3. ConsoleWriter - This element takes the content of the input, converts that to string using utf8 and dumps that to the output console.
C#
    /// <summary>
    /// This element does not take any input. It reads the file based on the command line argument and 
    /// dumps that to the output stream.
    /// </summary>
    class FileReader :IPipelineElement
    {
        string      filePath;
        FileStream  fileStream;

        byte[]      buffer       = new byte[1024];
        PipeStream  outputStream = new PipeStreamPkg.PipeStream();

        public FileReader()
        {
            IsComplete    = false;

            string[] args = Environment.GetCommandLineArgs();
            filePath   = args[1];
            fileStream = new FileStream(filePath, FileMode.Open);
        }

        public void SetInput(PipeStream inputStream)
        {
            throw new InvalidOperationException("No input for this element");
        }

        public void Connect(IPipelineElement next)
        {
            next.SetInput(outputStream);
        }

        /// <summary>
        /// Reads a chunk of the file and dumps that to the output
        /// </summary>
        public void Process()
        {
            if(fileStream == null)
            {
                return;
            }

            int bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            IsComplete = bytesRead <= 0;

            if (bytesRead > 0)
            {
                outputStream.Write(buffer, 0, bytesRead);
            }
            else
            {
                fileStream.Dispose();
                fileStream = null;
            }
        }

        public bool IsComplete
        {
            get;
            private set;
        }
    }

    /// <summary>
    /// This element takes the content of the input, counts the number of lines and 
    /// dumps that to the output.
    /// </summary>
    class LinesCounter :IPipelineElement
    {
        PipeStreamPkg.PipeStream input;
        PipeStreamPkg.PipeStream output = new PipeStreamPkg.PipeStream();

        public LinesCounter()
        {
            IsComplete = false;
        }

        public void SetInput(PipeStreamPkg.PipeStream inputStream)
        {
            input = inputStream;
        }

        public void Connect(IPipelineElement next)
        {
            next.SetInput(output);
        }

        int numberOfLine = 0;

        public void Process()
        {
            if (input.Length <= 0)
            {
                IsComplete = true;
                byte[] strbuf = new UTF8Encoding().GetBytes
                (string.Format("Number of lines = {0}", numberOfLine));
                output.Write(strbuf, 0, strbuf.Length);
                return;
            }

            byte[] buffer = new byte[1024];
            int bytesRead = 0;

            while(input.Length > 0)
            {
                bytesRead = input.Read(buffer, 0, 
                (buffer.Length < input.Length) ? buffer.Length : (int)input.Length);

                for (int i = 0; i < bytesRead; i++)
                {
                    if (buffer[i] == '\n')
                    {
                        numberOfLine++;
                    }
                }
            }
        }

        public bool IsComplete
        {
            get;
            private set;
        }
    }

    /// <summary>
    /// This element takes the content of the input, converts that to string using utf8 and 
    /// dumps that to the output console.
    /// </summary>
    class ConsoleWriter : IPipelineElement
    {
        PipeStream inputStream;

        public ConsoleWriter()
        {
            IsComplete = false;
        }

        public void SetInput(PipeStream input)
        {
            inputStream = input;
        }

        public void Connect(IPipelineElement next)
        {
            throw new InvalidOperationException("No output from this element");
        }

        public void Process()
        {
            if (inputStream.Length <= 0)
            {
                IsComplete = true;
                return;
            }

            byte[] buffer = new byte[1024];
            int bytesRead = 0;
            UTF8Encoding temp = new UTF8Encoding(true);

            while (inputStream.Length > 0)
            {
                bytesRead = inputStream.Read(buffer, 0, 
                (buffer.Length < inputStream.Length) ? 
                buffer.Length : (int)inputStream.Length);
                Console.WriteLine(temp.GetString(buffer, 0, bytesRead));
            }
        }

        public bool IsComplete
        {
            get;
            private set;
        }
    }
}

Now, out of these 3 elements, I constructed 2 pipelines:

  1. Dump the file content to the console.
  2. Count the lines and dump that to console.
C#
/// <summary>
/// This pipeline dumps the file to the console.
/// </summary>
static void DumpFile()
{
    // Just read and dump the file to console
    Pipeline p = new Pipeline();
    
    p.Add(new FileReader());
    p.Add(new ConsoleWriter());
    
    p.Run();
}

/// <summary>
/// This pipeline counts the number of lines.
/// </summary>
static void CountLines()
{
    // Count the number of lines
    Pipeline p = new Pipeline();
    
    p.Add(new FileReader());
    p.Add(new LinesCounter());
    p.Add(new ConsoleWriter());
    
    p.Run();
}

Points of Interest

During implementation, there are lot of variations possible depending on the need. For example:

  • The pipeline element can be an active element with its own thread for processing. This way, each element processes the data on its own. No external stimulus is needed.
  • The pipeline element can take multiple input and produce multiple outputs. This is useful in case of multimedia applications where the audio and video has to be processed in parallel and played in synchronization.
  • The pipeline is provided as a configuration to the program or based on the need the pipeline is constructed on the fly.

I chose the simplest one for this article. Passive, single input/output & static pipeline.

References

History

  • 17th November, 2014 - Initial publication
  • 18th November, 2014 - Somehow, my source code zip was not visible in the article. So, I added the remaining source code.
  • 20th November, 2014 - Included the PipeStream.cs in the project. In the original version, it was only linked from another project.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)