Introduction
Pipes and filters is a very famous design and architectural pattern. Wikipedia has a detailed explanation of what is a software pipeline. I was searching for a simple implementation that can be used in a training session, but most of the implementations available in the internet were advanced with multi threading and complex input/output. I wrote this implementation during a lunch session and decided to publish it here.
Background
If you want to understand this pattern from a real world example, just imagine the assembly line of a car. At one end, the auto parts are fed into the assembly line and then at each stage, a human/robot performs some operation. The output of each stage is the input to the next stage. After all the operations are completed, a car is delivered at the other end. Each processing element is responsible for a particular task and assumes only about its input and output.
In software you would have already used this when you wrote command like:
c:\>dir | more
Here, you are passing the output of "dir
" command as the input to the "more
" command. You may wonder why can't the "dir
" command provide pagination on its own? Why have "more
" command? But in that case, you cannot do something like:
c:\>type file.txt | more
Here, the entire file's content is written to the "more
" commands input and you can view the file page by page. If "more
" is not available, then each command has to implement the pagination logic again. This is a waste of effort and introduces lot of redundancy. The pipes and filters pattern solves this problem by breaking the solution into independent steps. Each solution step only assumes about the input and output. Until this assumption is met, a user can connect the solution steps in different fashion and solve complex problems.
Before jumping into the code, let us make the terminology clear. A pipeline is a solution constructed by connecting a set of pipeline elements. A pipeline element is a solution step that takes a specific input, processes the data and produces a specific output.
As always, when learning a concept, start with a simple example. I am going to construct a pipeline based on passive pipeline elements with single input/output.
Using the Code
IPipelineElement
All the pipeline elements implement this interface. Using this interface, a pipeline element can be connected to other pipeline element, checked for completion and process the input content.
Each pipeline element needs to read/write from a common data structure for its input/output. I used the PipeStream implementation by James Kolpack. But there are other alternatives like blocking collections available as a part of .NET 4.
interface IPipelineElement
{
void SetInput(PipeStream inputStream);
void Connect(IPipelineElement next);
void Process();
bool IsComplete
{
get;
}
}
Pipeline
This pipeline
class implements a sequential pipeline
. All the pipeline
elements are passive with a single input/output.
class Pipeline
{
List<IPipelineElement> pipeline = new List<IPipelineElement>();
public void Add(IPipelineElement anElement)
{
pipeline.Add(anElement);
if (pipeline.Count > 1)
pipeline[pipeline.Count - 2].Connect(pipeline[pipeline.Count - 1]);
}
public void Run()
{
bool jobCompleted = false;
while (!jobCompleted)
{
jobCompleted = true;
for (int i = 0; i < pipeline.Count; i++)
{
pipeline[i].Process();
jobCompleted = jobCompleted && pipeline[i].IsComplete;
}
}
}
}
I have implemented 3 pipeline
elements. The code is straight forward so I am just listing the usage.
FileReader
- This element reads the content of the file and writes that to the output. The file path is provided as a command line argument. LineCounter
- This element reads the content of the input, counts the number of lines and dumps that to the output in the end. ConsoleWriter
- This element takes the content of the input, converts that to string
using utf8 and dumps that to the output console.
class FileReader :IPipelineElement
{
string filePath;
FileStream fileStream;
byte[] buffer = new byte[1024];
PipeStream outputStream = new PipeStreamPkg.PipeStream();
public FileReader()
{
IsComplete = false;
string[] args = Environment.GetCommandLineArgs();
filePath = args[1];
fileStream = new FileStream(filePath, FileMode.Open);
}
public void SetInput(PipeStream inputStream)
{
throw new InvalidOperationException("No input for this element");
}
public void Connect(IPipelineElement next)
{
next.SetInput(outputStream);
}
public void Process()
{
if(fileStream == null)
{
return;
}
int bytesRead = fileStream.Read(buffer, 0, buffer.Length);
IsComplete = bytesRead <= 0;
if (bytesRead > 0)
{
outputStream.Write(buffer, 0, bytesRead);
}
else
{
fileStream.Dispose();
fileStream = null;
}
}
public bool IsComplete
{
get;
private set;
}
}
class LinesCounter :IPipelineElement
{
PipeStreamPkg.PipeStream input;
PipeStreamPkg.PipeStream output = new PipeStreamPkg.PipeStream();
public LinesCounter()
{
IsComplete = false;
}
public void SetInput(PipeStreamPkg.PipeStream inputStream)
{
input = inputStream;
}
public void Connect(IPipelineElement next)
{
next.SetInput(output);
}
int numberOfLine = 0;
public void Process()
{
if (input.Length <= 0)
{
IsComplete = true;
byte[] strbuf = new UTF8Encoding().GetBytes
(string.Format("Number of lines = {0}", numberOfLine));
output.Write(strbuf, 0, strbuf.Length);
return;
}
byte[] buffer = new byte[1024];
int bytesRead = 0;
while(input.Length > 0)
{
bytesRead = input.Read(buffer, 0,
(buffer.Length < input.Length) ? buffer.Length : (int)input.Length);
for (int i = 0; i < bytesRead; i++)
{
if (buffer[i] == '\n')
{
numberOfLine++;
}
}
}
}
public bool IsComplete
{
get;
private set;
}
}
class ConsoleWriter : IPipelineElement
{
PipeStream inputStream;
public ConsoleWriter()
{
IsComplete = false;
}
public void SetInput(PipeStream input)
{
inputStream = input;
}
public void Connect(IPipelineElement next)
{
throw new InvalidOperationException("No output from this element");
}
public void Process()
{
if (inputStream.Length <= 0)
{
IsComplete = true;
return;
}
byte[] buffer = new byte[1024];
int bytesRead = 0;
UTF8Encoding temp = new UTF8Encoding(true);
while (inputStream.Length > 0)
{
bytesRead = inputStream.Read(buffer, 0,
(buffer.Length < inputStream.Length) ?
buffer.Length : (int)inputStream.Length);
Console.WriteLine(temp.GetString(buffer, 0, bytesRead));
}
}
public bool IsComplete
{
get;
private set;
}
}
}
Now, out of these 3 elements, I constructed 2 pipelines:
- Dump the file content to the console.
- Count the lines and dump that to console.
static void DumpFile()
{
Pipeline p = new Pipeline();
p.Add(new FileReader());
p.Add(new ConsoleWriter());
p.Run();
}
static void CountLines()
{
Pipeline p = new Pipeline();
p.Add(new FileReader());
p.Add(new LinesCounter());
p.Add(new ConsoleWriter());
p.Run();
}
Points of Interest
During implementation, there are lot of variations possible depending on the need. For example:
- The
pipeline
element can be an active element with its own thread for processing. This way, each element processes the data on its own. No external stimulus is needed. - The
pipeline
element can take multiple input and produce multiple outputs. This is useful in case of multimedia applications where the audio and video has to be processed in parallel and played in synchronization. - The
pipeline
is provided as a configuration to the program or based on the need the pipeline
is constructed on the fly.
I chose the simplest one for this article. Passive, single input/output & static pipeline
.
References
History
- 17th November, 2014 - Initial publication
- 18th November, 2014 - Somehow, my source code zip was not visible in the article. So, I added the remaining source code.
- 20th November, 2014 - Included the PipeStream.cs in the project. In the original version, it was only linked from another project.