Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Pipes, River, Rails and Binder Patterns - Part III

0.00/5 (No votes)
30 Dec 2015 1  
A look at possible parallel-producer-consumer patterns. (Final Part)

<< Pipes, River, Rails and Binder Patterns - Part II

The Beginning

Finally, I am able to write the third (and last) final part of this series. Frankly speaking, when I started working on this article, I wanted to publish only Pipes design as tip/trick, however, by the time I could finish it, I decided to include other implementations like (River & Binder). But, again, I failed to include other patterns in the article as I was never finding time to finish it. So I decided to break it in 3 parts and also add Rails. This was, so far, my boring story behind this series. Anyway, let's quickly review what we saw in Part I & II.

  1. Pipes: An "out-in" members based simple design. Also, requires Action<TInput, TOutput> delegate(s) as final outcome handler. (See: Pipes, River, Rails and Binder Patterns - Part I)
  2. River: An improvement over Pipes which obviates the need of "out-in" relationship. The pipeline consists only Action<TInput, TOutput> delegates and input/output pair flows throughout the pipeline. Hence, no need of final outcome handler. (See: Pipes, River, Rails and Binder Patterns - Part II)
  3. Rails: This pattern is similar to River, however, requires an Interface implementation. The input data is a class and implements the required interface where interface represents the pipeline logic. The advantage of this implementation is that multiple input types can be processed by the same pipeline. (See: Pipes, River, Rails and Binder Patterns - Part II)
  4. Binder: An extension over River which decouples pipelines members and provides mean to allocate desired worker threads for each pipeline member. And this part is all about it.

As we construct a pipeline, there are times we want to allocate some threads to a member function (due to structural/tactical/performance or some other weird reasons). Thus, I propose you the design of Binder. If you have followed the series, you will find the usage of this implementation similar, i.e.

  1. Create the pipeline.
  2. Call StartProcessing.
  3. Add values to the pipeline using AddValue.
  4. Call StopProcessing.

Fourth Pattern: Binder

During the implementation of River, we have seen how to construct a Pipeline in which input and output can flow together. Now, we will extend this idea to have the liberty to independently assign worker threads to pipeline member functions. In order to gain this control, I decided to create a dedicated pipeline instance per member function and bind all those together in chain; without laden the user of code wondering about managing those instances. And, provide the user a simple and familiar (same as other 3 implementations) usage structure.

Let's first look at the code of Binder class:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Piper
{
    public class Binder<TInput, TOutput>
        where TOutput : class, new()
    {
        private BlockingCollection<DataWrapper> _dataCollection = null;
        private ManualResetEvent _completionEvent;
        private CancellationTokenSource _cancelTokenSource;
        private Binder<TInput, TOutput> _nextBind = null;
        private Action<TInput, TOutput> _currentAction;
        private int _maxConcurrency;
        private string _opCode;

//This Ctor will create the seed instance
        public Binder(Action<TInput, TOutput> currFunc,
                      int maxConcurrency = -1,
                      string opCode = "MyBinder") 
              : this(currFunc, maxConcurrency, opCode, new CancellationTokenSource())
        {
        }

//This Ctor is to create the remaining pipes
        private Binder(Action<TInput, TOutput> currFunc,
                      int maxConcurrency = -1,
                      string opCode = "MyBinder",
                      CancellationTokenSource tokenSource = null)
        {
            _currentAction = currFunc;
            _maxConcurrency = maxConcurrency;
            _completionEvent = new ManualResetEvent(false);
            _dataCollection = new BlockingCollection<DataWrapper>();
            _opCode = opCode;
            _cancelTokenSource = tokenSource;
        }

        public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
                                            int maxConcurrency = -1,
                                            string opCode = "MyBinder1")
        {
            if (_nextBind == null)
            {
                _nextBind = 
                   new Binder<TInput, TOutput>
                            (nextFunc, maxConcurrency, opCode, _cancelTokenSource);
            }
            else
            {
                _nextBind.Bind(nextFunc, maxConcurrency, opCode);
            }
            return this;
        }

        public bool AddValue(TInput inputValue)
        {
            return AddValue(new DataWrapper
            {
                InputVal = inputValue,
                OutputVal = new TOutput()
            });
        }

        public void StopProcessing(bool waitForProcessing = false)
        {
            _dataCollection.CompleteAdding();
            if (waitForProcessing)
                _completionEvent.WaitOne();
        }

        public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
        {
            if (_nextBind != null)
                _nextBind.StartProcessing(errorHandler);
            var option = new ParallelOptions
            {
                MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
                CancellationToken = _cancelTokenSource.Token
            };

            Task.Factory.StartNew(() =>
            {
                try
                {
                    Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
                       option,
                       currData =>
                       {
                           try
                           {
                               option.CancellationToken
                                     .ThrowIfCancellationRequested();

                               _currentAction(currData.InputVal,
                                              currData.OutputVal);

                               if (_nextBind != null)
                                  _nextBind.AddValue(currData);
                           }
                           catch (Exception e)
                           {
                               errorHandler(currData.InputVal,
                                    currData.OutputVal,
                                    "Error occurred inside " + 
                                            _opCode + " pipeline.",
                                    e);
                           }
                       });
                }
                catch (OperationCanceledException)
                {
                }
                finally
                {
                    if (_nextBind != null)
                    {
                        _nextBind._dataCollection.CompleteAdding();
                        _nextBind._completionEvent.WaitOne();
                    }
                    _completionEvent.Set();
                }
            });
        }

        public void AbortProcessing()
        {
            _dataCollection.CompleteAdding();
            _cancelTokenSource.Cancel();
        }

        private bool AddValue(DataWrapper currData)
        {
            return _dataCollection.TryAdd(currData);
        }

        private class DataWrapper
        {
            internal TInput InputVal;
            internal TOutput OutputVal;
        }
    }
}

Binder Dissection

Now, to understand how it works, first we need to know that, in C#, we have a class-level privacy, i.e. private members of class instance are accessible inside that class. Let's take an example of this:

public class MyInput
{
    private int a = 1;
    public MyInput GetInstanceWithValue(int b)
    {
        var newInstance = new MyInput();
        //This is legal in C#
        //accessing private variable of new instance inside the same class
        newInstance.a = b;
        return newInstance;
    }
}

Understanding Construction

Taking advantage of class-level privacy feature, I am allowed to hold the instance of Binder class privately and can access all the properties/members. And this exactly what I am doing BUT in chain like this (please follow the code comments):

public class Binder<TInput, TOutput>
        where TOutput : class, new()
{
    private Binder<TInput, TOutput> _nextBind = null;

    public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
                                        int maxConcurrency = -1,
                                        string opCode = "MyBinder1")
    {
//If child pipe is not defined, then we define it as current pipe
        if (_nextBind == null)
        {
//Here we use the private Ctor and pass the same cancellation token source
//this way, cancelling on seed pipe's token will cancel all pipes in chain
            _nextBind = new Binder<TInput, TOutput>
                  (nextFunc, maxConcurrency, opCode, _cancelTokenSource);
        }
        else
        {
//Otherwise, we pass the current pipe to child pipe
//Then, child pipe will check its child pipe and so on...
//thus, given pipe will become the child of the last pipe in the chain
//and for the next Bind() call, current pipe will be the last pipe in the chain
            _nextBind.Bind(nextFunc, maxConcurrency, opCode);
        }

//Mind it, I am returning the seed instance for every call, thus,
//Consumer of this class can call this method ONLY on SEED pipe.
//This is also required, coz user must call Start/Stop/Abort Processing on seed
//As Seed is the first pipe to process input/output pair and
//we DO not want to overwhelm the user with new instant per BIND.
        return this;
    }
}

Understanding Processing

Once, we have pipeline constructed (as discussed above), our immediate next goal is to implement the StartProcessing method which can consume such a chain of Binder instances. To achieve our goal, few things we have to keep in mind which are:

  • We must start the pipeline in reverse order (bottom-up approach), i.e. each child pipe GetConsumingEnumerable() loop must be initialized before the parent pipe can pass the data to it. This would ensure smooth data processing without any blockage. Top-down approach is also possible but, in this case, BlockingCollection may start bloating.
  • After processing each input/output pair, we need to ensure that this pair flows to pipe (as everything is Async) next in chain.
  • Upon CompleteAdding() call on parent pipe, we must ensure that child pipe has also finished processing all the remaining items.
  • AbortProcessing() call must abort every pipe in chain : This one is easy to achieve as we are sharing the same CancellationTokenSource among pipes.

Let me show you how above mentioned points have been integrated in the code (please follow the code comments):

public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
//If child pipe exist, call its StartProcessing in chain to have bottom-up initialization
    if (_nextBind != null)
        _nextBind.StartProcessing(errorHandler);

//each pipe will use its own maxConcurrency value
//and share the same cancellation token
    var option = new ParallelOptions
    {
        MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
        CancellationToken = _cancelTokenSource.Token
    };

    Task.Factory.StartNew(() =>
    {
        try
        {
            Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
               option,
               currData =>
               {
                   try
                   {
                       option.CancellationToken
                                     .ThrowIfCancellationRequested();

//Execute the current pipe method with the data of OWN BlockingCollection
//NOTE: if this is inside SEED, then data pair is what USER supplied,
//      else it is the processed pair from previous pipe in chain.
                       _currentAction(currData.InputVal,
                                              currData.OutputVal);

//Pass the pair to child pipe
//NOTE: this call is NOT recursive => it just populate the
//                        BlockingCollection of immediate child.
//this pair will be available to child ForEach loop
//and then from there to its own child and so on...
                       if (_nextBind != null)
                          _nextBind.AddValue(currData);
                   }
                   catch (Exception e)
                   {
                       errorHandler(currData.InputVal,
                            currData.OutputVal,
                            "Error occurred inside " + 
                                            _opCode + " pipeline.",
                            e);
                   }
               });
        }
        catch (OperationCanceledException)
        {
        }
        finally
        {

//If child is not null
            if (_nextBind != null)
            {

//Close BlockingCollection of Child for data-adding
//Reason: We are here means, parent is done, hence child shouldn't receive data
//        All data adding was done above in Try clause
//        Or by the user (SEED case) and he called StopProcessing.
                _nextBind._dataCollection.CompleteAdding();

//Wait until child gives completion signal
                _nextBind._completionEvent.WaitOne();
            }

//Give OWN completion signal
//Note: Last child in the chain will give this signal without waiting for
//      any other signal, thus, 
//      its parent could give its OWN signal to its parent and so on...
            _completionEvent.Set();
        }
    });
}

Other functions like AbortProcessing(), StopProcessing() etc. are trivial and I let the readers to explore those. Thus, I would say that I am done with the fourth and last pattern of this three (3) parts series.

Binder By Example

Lets me show you a quick (trivial and boring) usage of above pattern:

using System;
using System.Threading.Tasks;

namespace Piper
{
    class Program
    {
        static void Main(string[] args)
        {
            SomeCode();
            GC.Collect();
            GC.WaitForPendingFinalizers();
            Console.WriteLine("###### DONE ######");
            Console.ReadLine();
        }
        static void SomeCode()
        {
            var myPipes = new Binder<MyInput, MyOutput>(One, 1, "One")
                .Bind(Two, 1, "Two")
                .Bind(Three, 2, "Three")// => 2 threads for THREE
                .Bind(Four, 1, "Four");
            myPipes.StartProcessing(HandleError);
            Parallel.For(0, 4, input => myPipes.AddValue(new MyInput()));
            myPipes.StopProcessing();
        }
        static void HandleError<T1, T2>(T1 a, T2 b, string code, Exception e)
        {
            Console.WriteLine(code + Environment.NewLine + e.Message);
        }
        static void One(MyInput a, MyOutput b)
        {
            Console.WriteLine("One on T:" +
                System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
        static void Two(MyInput a, MyOutput b)
        {
            System.Threading.Thread.Sleep(1000);
            Console.WriteLine("Two on T:" +
                System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
        static void Three(MyInput a, MyOutput b)
        {
            System.Threading.Thread.Sleep(2000);
            Console.WriteLine("Three on T:" +
                System.Threading.Thread.CurrentThread.ManagedThreadId);
            //throw new Exception("Mine");
        }
        static void Four(MyInput a, MyOutput b)
        {
            Console.WriteLine("Four (or LAST)");
            Console.WriteLine("A=" + a + ",B=" + b +
                ",On T:" + System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
    }
    public class MyInput
    {
        //All my Input values
    }
    public class MyOutput
    {
        //All my output values
    }
}

In the above example, following strategy has been adopted to simulate the processing time in order to show the benefit of dedicated threads per method:

  1. Function One() and Four() has no wait and single threaded,
  2. Function Two() has a wait of 1 second and single threaded.
  3. Function Three() has a wait of 2 seconds and has 2 threads.

Using such example, we expect following:

  1. All calls to One() will execute immediately, and thus, BlockingCollection of Two() will be immediately be populated.
  2. After first 2 executions of Two(), we MUST see a single execution of Three() (be'coz while Three() waits for 2 seconds, two calls of Two() can execute).
  3. Then for each outcome of Two(), we MUST see an outcome of Three() (double duration, double threads effect).
  4. We must see the outcome of Four(), almost next to outcome of Three() as Four() has no wait.

On my personal laptop with 2 cores, I am able to see below listed outcome. I let you prepare and run other examples/tests.

Bonne Année 2015

In this last part of this series, we have seen the fourth and final pattern. I hope you (the readers) liked the series, nonetheless, let me know if you still have any question/comment/suggestion/improvement. Lastly, I wish you all A VERY HAPPY NEW YEAR 2015. May GOD blah blah blah... ENJOY and Keep sharing interesting codes!!! Bonne Année 2015!!! Yu-Hu!!!

<< Pipes, River, Rails and Binder Patterns - Part II

History

This is the V1 of the suggested solution.

Logically, nothing was changed, however, some diagrams for visualization have been added.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here