<< Pipes, River, Rails and Binder Patterns - Part II
The Beginning
Finally, I am able to write the third (and last) final part of this series. Frankly speaking, when I started working on this article, I wanted to publish only Pipes
design as tip/trick, however, by the time I could finish it, I decided to include other implementations like (River
& Binder
). But, again, I failed to include other patterns in the article as I was never finding time to finish it. So I decided to break it in 3 parts and also add Rails
. This was, so far, my boring story behind this series. Anyway, let's quickly review what we saw in Part I & II.
Pipes
: An "out-in" members based simple design. Also, requires Action<TInput, TOutput> delegate(s) as final outcome handler. (See: Pipes, River, Rails and Binder Patterns - Part I)
River
: An improvement over Pipes
which obviates the need of "out-in" relationship. The pipeline consists only Action<TInput, TOutput>
delegates and input/output pair flows throughout the pipeline. Hence, no need of final outcome handler. (See: Pipes, River, Rails and Binder Patterns - Part II)
Rails
: This pattern is similar to River
, however, requires an Interface implementation. The input data is a class and implements the required interface where interface represents the pipeline logic. The advantage of this implementation is that multiple input types can be processed by the same pipeline. (See: Pipes, River, Rails and Binder Patterns - Part II)
Binder
: An extension over River
which decouples pipelines members and provides mean to allocate desired worker threads for each pipeline member. And this part is all about it.
As we construct a pipeline, there are times we want to allocate some threads to a member function (due to structural/tactical/performance or some other weird reasons). Thus, I propose you the design of Binder
. If you have followed the series, you will find the usage of this implementation similar, i.e.
- Create the pipeline.
- Call
StartProcessing
.
- Add values to the pipeline using
AddValue
.
- Call
StopProcessing
.
Fourth Pattern: Binder
During the implementation of River
, we have seen how to construct a Pipeline in which input and output can flow together. Now, we will extend this idea to have the liberty to independently assign worker threads to pipeline member functions. In order to gain this control, I decided to create a dedicated pipeline instance per member function and bind all those together in chain; without laden the user of code wondering about managing those instances. And, provide the user a simple and familiar (same as other 3 implementations) usage structure.
Let's first look at the code of Binder
class:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Piper
{
public class Binder<TInput, TOutput>
where TOutput : class, new()
{
private BlockingCollection<DataWrapper> _dataCollection = null;
private ManualResetEvent _completionEvent;
private CancellationTokenSource _cancelTokenSource;
private Binder<TInput, TOutput> _nextBind = null;
private Action<TInput, TOutput> _currentAction;
private int _maxConcurrency;
private string _opCode;
public Binder(Action<TInput, TOutput> currFunc,
int maxConcurrency = -1,
string opCode = "MyBinder")
: this(currFunc, maxConcurrency, opCode, new CancellationTokenSource())
{
}
private Binder(Action<TInput, TOutput> currFunc,
int maxConcurrency = -1,
string opCode = "MyBinder",
CancellationTokenSource tokenSource = null)
{
_currentAction = currFunc;
_maxConcurrency = maxConcurrency;
_completionEvent = new ManualResetEvent(false);
_dataCollection = new BlockingCollection<DataWrapper>();
_opCode = opCode;
_cancelTokenSource = tokenSource;
}
public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
int maxConcurrency = -1,
string opCode = "MyBinder1")
{
if (_nextBind == null)
{
_nextBind =
new Binder<TInput, TOutput>
(nextFunc, maxConcurrency, opCode, _cancelTokenSource);
}
else
{
_nextBind.Bind(nextFunc, maxConcurrency, opCode);
}
return this;
}
public bool AddValue(TInput inputValue)
{
return AddValue(new DataWrapper
{
InputVal = inputValue,
OutputVal = new TOutput()
});
}
public void StopProcessing(bool waitForProcessing = false)
{
_dataCollection.CompleteAdding();
if (waitForProcessing)
_completionEvent.WaitOne();
}
public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
if (_nextBind != null)
_nextBind.StartProcessing(errorHandler);
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelTokenSource.Token
};
Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
currData =>
{
try
{
option.CancellationToken
.ThrowIfCancellationRequested();
_currentAction(currData.InputVal,
currData.OutputVal);
if (_nextBind != null)
_nextBind.AddValue(currData);
}
catch (Exception e)
{
errorHandler(currData.InputVal,
currData.OutputVal,
"Error occurred inside " +
_opCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
finally
{
if (_nextBind != null)
{
_nextBind._dataCollection.CompleteAdding();
_nextBind._completionEvent.WaitOne();
}
_completionEvent.Set();
}
});
}
public void AbortProcessing()
{
_dataCollection.CompleteAdding();
_cancelTokenSource.Cancel();
}
private bool AddValue(DataWrapper currData)
{
return _dataCollection.TryAdd(currData);
}
private class DataWrapper
{
internal TInput InputVal;
internal TOutput OutputVal;
}
}
}
Binder Dissection
Now, to understand how it works, first we need to know that, in C#, we have a class-level privacy, i.e. private members of class instance are accessible inside that class. Let's take an example of this:
public class MyInput
{
private int a = 1;
public MyInput GetInstanceWithValue(int b)
{
var newInstance = new MyInput();
newInstance.a = b;
return newInstance;
}
}
Understanding Construction
Taking advantage of class-level privacy feature, I am allowed to hold the instance of Binder
class privately and can access all the properties/members. And this exactly what I am doing BUT in chain like this (please follow the code comments):
public class Binder<TInput, TOutput>
where TOutput : class, new()
{
private Binder<TInput, TOutput> _nextBind = null;
public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
int maxConcurrency = -1,
string opCode = "MyBinder1")
{
if (_nextBind == null)
{
_nextBind = new Binder<TInput, TOutput>
(nextFunc, maxConcurrency, opCode, _cancelTokenSource);
}
else
{
_nextBind.Bind(nextFunc, maxConcurrency, opCode);
}
return this;
}
}
Understanding Processing
Once, we have pipeline constructed (as discussed above), our immediate next goal is to implement the StartProcessing method which can consume such a chain of Binder
instances. To achieve our goal, few things we have to keep in mind which are:
- We must start the pipeline in reverse order (bottom-up approach), i.e. each child pipe
GetConsumingEnumerable()
loop must be initialized before the parent pipe can pass the data to it. This would ensure smooth data processing without any blockage. Top-down approach is also possible but, in this case, BlockingCollection
may start bloating.
- After processing each input/output pair, we need to ensure that this pair flows to pipe (as everything is Async) next in chain.
- Upon
CompleteAdding()
call on parent pipe, we must ensure that child pipe has also finished processing all the remaining items.
AbortProcessing()
call must abort every pipe in chain : This one is easy to achieve as we are sharing the same CancellationTokenSource
among pipes.
Let me show you how above mentioned points have been integrated in the code (please follow the code comments):
public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
if (_nextBind != null)
_nextBind.StartProcessing(errorHandler);
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelTokenSource.Token
};
Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
currData =>
{
try
{
option.CancellationToken
.ThrowIfCancellationRequested();
_currentAction(currData.InputVal,
currData.OutputVal);
if (_nextBind != null)
_nextBind.AddValue(currData);
}
catch (Exception e)
{
errorHandler(currData.InputVal,
currData.OutputVal,
"Error occurred inside " +
_opCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
finally
{
if (_nextBind != null)
{
_nextBind._dataCollection.CompleteAdding();
_nextBind._completionEvent.WaitOne();
}
_completionEvent.Set();
}
});
}
Other functions like AbortProcessing()
, StopProcessing()
etc. are trivial and I let the readers to explore those. Thus, I would say that I am done with the fourth and last pattern of this three (3) parts series.
Binder By Example
Lets me show you a quick (trivial and boring) usage of above pattern:
using System;
using System.Threading.Tasks;
namespace Piper
{
class Program
{
static void Main(string[] args)
{
SomeCode();
GC.Collect();
GC.WaitForPendingFinalizers();
Console.WriteLine("###### DONE ######");
Console.ReadLine();
}
static void SomeCode()
{
var myPipes = new Binder<MyInput, MyOutput>(One, 1, "One")
.Bind(Two, 1, "Two")
.Bind(Three, 2, "Three") .Bind(Four, 1, "Four");
myPipes.StartProcessing(HandleError);
Parallel.For(0, 4, input => myPipes.AddValue(new MyInput()));
myPipes.StopProcessing();
}
static void HandleError<T1, T2>(T1 a, T2 b, string code, Exception e)
{
Console.WriteLine(code + Environment.NewLine + e.Message);
}
static void One(MyInput a, MyOutput b)
{
Console.WriteLine("One on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
static void Two(MyInput a, MyOutput b)
{
System.Threading.Thread.Sleep(1000);
Console.WriteLine("Two on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
static void Three(MyInput a, MyOutput b)
{
System.Threading.Thread.Sleep(2000);
Console.WriteLine("Three on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
static void Four(MyInput a, MyOutput b)
{
Console.WriteLine("Four (or LAST)");
Console.WriteLine("A=" + a + ",B=" + b +
",On T:" + System.Threading.Thread.CurrentThread.ManagedThreadId);
}
}
public class MyInput
{
}
public class MyOutput
{
}
}
In the above example, following strategy has been adopted to simulate the processing time in order to show the benefit of dedicated threads per method:
- Function
One()
and Four()
has no wait and single threaded,
- Function
Two()
has a wait of 1 second and single threaded.
- Function
Three()
has a wait of 2 seconds and has 2 threads.
Using such example, we expect following:
- All calls to
One()
will execute immediately, and thus, BlockingCollection
of Two()
will be immediately be populated.
- After first 2 executions of
Two()
, we MUST see a single execution of Three()
(be'coz while Three()
waits for 2 seconds, two calls of Two()
can execute).
- Then for each outcome of
Two()
, we MUST see an outcome of Three()
(double duration, double threads effect).
- We must see the outcome of
Four()
, almost next to outcome of Three()
as Four()
has no wait.
On my personal laptop with 2 cores, I am able to see below listed outcome. I let you prepare and run other examples/tests.
Bonne Année 2015
In this last part of this series, we have seen the fourth and final pattern. I hope you (the readers) liked the series, nonetheless, let me know if you still have any question/comment/suggestion/improvement. Lastly, I wish you all A VERY HAPPY NEW YEAR 2015. May GOD blah blah blah... ENJOY and Keep sharing interesting codes!!! Bonne Année 2015!!! Yu-Hu!!!
<< Pipes, River, Rails and Binder Patterns - Part II
History
This is the V1 of the suggested solution.
Logically, nothing was changed, however, some diagrams for visualization have been added.