Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Generic Pipeline

0.00/5 (No votes)
23 Jan 2014 1  
Implementations of a computing pipeline, with design explanation and code samples

Introduction  

This article shows an implementation of threading pipeline in C#. The pipeline can help to optimize processes which have a “bottleneck”. First time I used it to speed up a 3-step process: 1st step (“bottleneck”) - get data from web-pages, 2nd – parse HTML code to find keywords, 3rd step – make some analysis. Originally in my project I used 3 Task objects (one for each step) with buffers and synchronization, and then created a reusable library.

Basics

A very simple explanation of a pipeline concept could be found in Wiki

A pipeline is a set of data processing elements (stages) connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion; in that case, some amount of buffer storage is often inserted between elements.

The main advantage of pipeline is an increase of the system throughput by performing multiple operations in parallel when processing a stream of data.

If a pipeline consists of k stages (k>1), which take t1, t2, … tk ticks to perform operation with 1 item, then lets set T = MAX(t1, t2, … tk). If we have N input values, it will take T*(N-1) + t1 + t2 +...+ tk ticks for the pipeline to produce results for all input values.

Class diagram and design

At the first look the main class Pipeline has something common with .NET class Tuple, because it has variants with different number of type parameters.

N-stage Pipeline has N+1 type parameters and N private fields for each stage (PipelineStageBase _stage1, _stage2 … _stageN), which can be either created in constructor or set by methods MountStageX().

A pipeline is an organizer of work: it invokes stage initialization, starts their execution and provides feedback for clients. Pipeline stages (PipelineStageBase<TIn,TOut> class) do the rest. Lets look at them closely.

Boolean properties CanBeStarted, IsRunning, IsCompleted, IsCancelled, HasResults describe current status of a stage. The stage receives a Tin item from PreviousStage, then produces a TOut item for the next stage and puts this temporary result in buffer (Results property). The last stage of a pipeline (IsLastStage = true) keeps final results.

When a pipeline is starting, it invokes Initialize() and Run() methods for each stage. Remember: pipeline runs stages in parallel and creates a Task for each one! Stage initialization basically means that IsCompleted, IsCancelled and IsLastStage properties are set to false. Different overloads of method Run accepts IEnumerable<TIn> parameter for the first stage or IProducer<TIn> parameter for others and also optional CancellationToken parameter to support cancellation.

Stage operations are performed in Run() method while a previous stage has results or there are input values. When a result produced, a stage reports about success (StageCompleted method) or about Exception (StageFailed method). If a pipeline was stopped by client, working stages reports about cancellation (StageCancelled method)

Stages pass results using an ItemProgress object as a container. ItemProgress.CurrentValue is the latest produced result and ItemProgress.Results property contains results of all previous stages. The last stage of pipeline reports this object to pipeline. Pipeline property State helps clients to monitor process.

Pipeline itself inherits PipelineStageBase and it is a key to create long main pipeline, mounting other pipeline objects in place of stages. Pipeline class overrides IsRunning, IsCompleted, IsCancelled, Results properties taking them from the last stage. It also overrides Initialize() and Run() methods to start stages correctly.

Using the code

The formula of usage is simple:

  • mount all pipeline stages
  • start the pipeline
  • [optional – do monitoring from time to time]
  • listen to Completed event

private void CreatePipeline()
{
    _demoPipeline=new Pipeline<string,int,double,bool>();
    _demoPipeline.MountStage1(IndexOfA);
    _demoPipeline.MountStage2(Sqrt);
    _demoPipeline.MountStage3(IsCorrect);
    _demoPipeline.Completed+=DemoPipelineOnCompleted;
}

private int IndexOfA(string str)
{
    Thread.Sleep(Interval1);
    return Math.Max(str.IndexOf('a'),0);
}
 
private double Sqrt(int i)
{
    if(i>28)
        throw new IndexOutOfRangeException("fail test");
    Thread.Sleep(Interval2);
    return Math.Sqrt(i);
}
 
private bool IsCorrect(double d)
{
    Thread.Sleep(Interval3);
    return d>3;
}

private int Interval1 {get{return (int)nudDuration1.Value*1000;}}
private int Interval2 {get{return (int)nudDuration2.Value*1000;}}
private int Interval3 {get{return (int)nudDuration3.Value*1000;}}

private void StartPipelineClick(object sender, EventArgs e)
{
    var items=new List<String>();
    for(inti=0;i<ItemsCount;i++)
    items.Add(Guid.NewGuid().ToString());
 
    timerState.Start();
    _cts=newCancellationTokenSource();
    _demoPipeline.Start(items,_cts.Token);
}

private void DemoPipelineOnCompleted(object sender,CancelEventArgs e)
{
    timerState.Stop();
    DisplayPipelineState();
}

private void DisplayPipelineState()
{
    int t;
    for(int i=0;i<_demoPipeline.State.Results.Count;i++)
    {
        var status=_demoPipeline.State.Results[i];
        for(t=0;t<status.Results.Count;t++)
        {
            // display results
        }
        if(status.Error!=null)
        {
            // display error
        }
        else if(status.Cancelled)
        {
            // display cancellation
        }
    }
}  

Demo

Set Item Count for pipeline and timeout for each stage and click Start button.

DemoSettings It's possible to monitor available results

Time Measurement

In the Demo a 2-stage pipeline with inner 2-stage pipeline was used. I made a test and set ItemCount to 1000 and all Intervals to 100 milliseconds. The estimated time was 100 sec, and on 4-core processor I got the total time 101.4 sec – latency was less than 1.5 sec (1.5%)

Brave New Stage

This part of the article appeared due to: 

1. One-T and his design suggestions  

2. Paulo Zemek and his motivating questions and stage buffer idea

An improved implementation required only 1 generic interface (IStage<TIn>) and 1 generic class (Stage<TIn, TOut> : IStage<TIn>). The principal difference with Pipeline is in connections between stages. Stages keep reference to the next stage of a pipeline (IStage<TOut> Next property) and doesn't have a link to the previous one.

When the first stage of Pipeline is starting, it initializes the second stage invoking Init() method. The second stage initializes the third stage and so on. Initialization means that each stage sets IsRunning property to true, creates a buffer for input values (Queue<TIn>), disables modification of its state (function, buffer capacity and link to the next stage) and starts a new Task to perform operation in parallel.

When a stage produces an item, it has to send this item to the next stage, using method Next.Enqueue(). But the item will accepted only if real buffer capacity is less than BufferCapacity property. Each stage has to wait until produced item is enqueued. The last stage of a pipeline puts its results in StageContext object for client to have access to them.

When a stage is finished will all input values, it uses method Next.SendTerminationSignal() to allow the next stage to stop after processing of all items in buffer.

Usage of code:

public override void Start(IEnumerable<string> args)
{
      _first = new Stage<string,>(IndexOfA);
      _first.Add(Sqrt).Add(IsCorrect);

      var _ctx = new StageContext();
      _ctx.Completed += (sender, e) => OnCompleted(e);
      _cts = new CancellationTokenSource();
      _ctx.Token = _cts.Token;            
      _ctx.Results.Clear();
      _first.Start(args, _ctx);
}</string,>

Less code, same performance 

History 

Original post (Pipeline implementation): December 27, 2013

Update (Stage implementation): January 21, 2014

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here