Introduction
This article
shows
an
implementation of threading pipeline in C#.
The pipeline can help to optimize processes which have a
“bottleneck”. First time I used it to speed up a 3-step process:
1st step (“bottleneck”) - get data from web-pages, 2nd – parse
HTML code to
find keywords, 3rd step – make some analysis. Originally in my
project I used 3 Task objects (one for each step) with buffers and synchronization, and
then created a reusable library.
Basics
A very simple explanation of a pipeline concept could be found in Wiki
A
pipeline is a set of data processing elements (stages) connected in
series, where the output of one element is the input of the next one.
The elements of a pipeline are often executed in parallel or in
time-sliced fashion; in that case, some amount
of buffer
storage is
often inserted between elements.
The
main advantage of pipeline is an increase of the system throughput
by performing multiple operations in parallel when processing a
stream of data.
If
a pipeline consists of k stages (k>1), which take t1, t2, … tk
ticks to perform operation with 1 item, then lets set T = MAX(t1, t2,
… tk). If we have N input values, it will take T*(N-1) + t1 + t2
+...+ tk ticks for the pipeline to produce results for all input values.
Class diagram and design
At the first look the main class Pipeline
has something common with .NET class Tuple, because it has variants with different number of type parameters.
N-stage Pipeline
has N+1 type parameters and N private fields for each stage (PipelineStageBase
_stage1, _stage2 … _stageN), which can be either created in constructor or set by methods MountStageX().
A pipeline is an organizer of work: it invokes stage initialization, starts their execution and provides feedback for clients. Pipeline stages (PipelineStageBase
<TIn,TOut> class) do the rest. Lets look at them closely.
Boolean properties CanBeStarted, IsRunning, IsCompleted, IsCancelled, HasResults
describe current status of a stage. The stage receives a Tin item from PreviousStage
, then produces a TOut item for the next stage and puts this temporary result in buffer (Results
property). The last stage of a pipeline (IsLastStage
= true) keeps final results.
When a pipeline is starting, it invokes Initialize
() and Run
() methods for each stage. Remember: pipeline runs stages in parallel and creates a Task for each one! Stage initialization basically means that IsCompleted
, IsCancelled
and IsLastStage
properties are set to false. Different overloads of method Run
accepts IEnumerable<TIn> parameter for the first stage or IProducer<TIn> parameter for others and also optional CancellationToken parameter to support cancellation.
Stage operations are performed in Run
() method while a previous stage has results or there are input values. When a result produced, a stage reports about success (StageCompleted
method) or about Exception (StageFailed
method). If a pipeline was stopped by client, working stages reports about cancellation (StageCancelled
method)
Stages pass results using an ItemProgress
object as a container. ItemProgress.CurrentValue
is the latest produced result and ItemProgress.Results
property contains results of all previous stages. The last stage of pipeline reports this object to pipeline. Pipeline property State
helps clients to monitor process.
Pipeline
itself inherits PipelineStageBase
and it is a key to create long main pipeline, mounting other pipeline objects in place of stages. Pipeline class overrides IsRunning, IsCompleted, IsCancelled, Results
properties taking them from the last stage. It also overrides Initialize
() and Run
() methods to start stages correctly.
Using the code
The formula of usage is simple:
- mount all pipeline stages
- start the pipeline
- [optional – do monitoring from time to time]
- listen to Completed event
private void CreatePipeline()
{
_demoPipeline=new Pipeline<string,int,double,bool>();
_demoPipeline.MountStage1(IndexOfA);
_demoPipeline.MountStage2(Sqrt);
_demoPipeline.MountStage3(IsCorrect);
_demoPipeline.Completed+=DemoPipelineOnCompleted;
}
private int IndexOfA(string str)
{
Thread.Sleep(Interval1);
return Math.Max(str.IndexOf('a'),0);
}
private double Sqrt(int i)
{
if(i>28)
throw new IndexOutOfRangeException("fail test");
Thread.Sleep(Interval2);
return Math.Sqrt(i);
}
private bool IsCorrect(double d)
{
Thread.Sleep(Interval3);
return d>3;
}
private int Interval1 {get{return (int)nudDuration1.Value*1000;}}
private int Interval2 {get{return (int)nudDuration2.Value*1000;}}
private int Interval3 {get{return (int)nudDuration3.Value*1000;}}
private void StartPipelineClick(object sender, EventArgs e)
{
var items=new List<String>();
for(inti=0;i<ItemsCount;i++)
items.Add(Guid.NewGuid().ToString());
timerState.Start();
_cts=newCancellationTokenSource();
_demoPipeline.Start(items,_cts.Token);
}
private void DemoPipelineOnCompleted(object sender,CancelEventArgs e)
{
timerState.Stop();
DisplayPipelineState();
}
private void DisplayPipelineState()
{
int t;
for(int i=0;i<_demoPipeline.State.Results.Count;i++)
{
var status=_demoPipeline.State.Results[i];
for(t=0;t<status.Results.Count;t++)
{
}
if(status.Error!=null)
{
}
else if(status.Cancelled)
{
}
}
}
Demo
Set Item Count for pipeline and timeout for each
stage and click Start button.
It's
possible to monitor available results
Time Measurement
In the Demo a 2-stage pipeline with inner 2-stage pipeline was used. I made a test and set ItemCount to 1000 and all Intervals to 100 milliseconds. The estimated time was 100 sec, and on 4-core processor I got the total time 101.4 sec – latency was less than 1.5 sec (1.5%)
Brave New Stage
This part of the article appeared due to:
1. One-T and his design suggestions
2. Paulo Zemek and his motivating questions and stage buffer idea
An improved implementation required only 1 generic interface (IStage<TIn>
) and 1 generic class (Stage<TIn, TOut>
: IStage<TIn>). The principal difference with Pipeline
is in connections between stages. Stages keep reference to the next stage of a pipeline (IStage<TOut> Next
property) and doesn't have a link to the previous one.
When the first stage of Pipeline is starting, it initializes the second stage invoking Init()
method. The second stage initializes the third stage and so on. Initialization means that each stage sets IsRunning
property to true, creates a buffer for input values (Queue<TIn>), disables modification of its state (function, buffer capacity and link to the next stage) and starts a new Task to perform operation in parallel.
When a stage produces an item, it has to send this item to the next stage, using method Next.Enqueue()
. But the item will accepted only if real buffer capacity is less than BufferCapacity
property. Each stage has to wait until produced item is enqueued. The last stage of a pipeline puts its results in StageContext object for client to have access to them.
When a stage is finished will all input values, it uses method Next.SendTerminationSignal()
to allow the next stage to stop after processing of all items in buffer.
Usage of code:
public override void Start(IEnumerable<string> args)
{
_first = new Stage<string,>(IndexOfA);
_first.Add(Sqrt).Add(IsCorrect);
var _ctx = new StageContext();
_ctx.Completed += (sender, e) => OnCompleted(e);
_cts = new CancellationTokenSource();
_ctx.Token = _cts.Token;
_ctx.Results.Clear();
_first.Start(args, _ctx);
}</string,>
Less code, same performance
History
Original post (Pipeline implementation): December 27, 2013
Update (Stage implementation): January 21, 2014