Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Pipes, River, Rails and Binder Patterns - Part II

0.00/5 (No votes)
14 Dec 2014 1  
A look at possible parallel-producer-consumer patterns (Second Part)

Retrospection

In this series of articles, I promised you (the readers) to discuss four following implementations of parallel-producer-consumer pattern:

  1. Pipes: An "out-in" members based simple design. Also, requires Action<TInput, TOutput> delegate(s) as final outcome handler.
  2. River: An improvement over Pipes which obviates the need of "out-in" relationship. The pipeline consists only Action<TInput, TOutput> delegates and input/output pair flows throughout the pipeline. Hence, no need of final outcome handler.
  3. Rails: This pattern is similar to River, however, requires an Interface implementation. The input data is a class and implements the required interface where interface represents the pipeline logic. The advantage of this implementation is that multiple input types can be processed by the same pipeline.
  4. Binder: An extension over River which decouples pipelines members and provides mean to allocate desired worker threads for each pipeline member.

In Part I (Pipes, River, Rails and Binder Patterns - Part I), we have already seen the implementation of Pipes and I hope you liked it; I would be glad to hear from you if it was helpful to you in any way. In this article, we will discuss the implementation of River and Rails with examples. Finally, as per my promise, I will bring you Binder as the last part of this series.

Why Not Pipes?

Implementing Pipes is useful when all that is required to a pipeline members is the last computed value. However, in practice, it is too restricted and most of the times, we require more than just the last computed value (e.g. other inputs with/without last output). Consider situations when:

  • Each pipe requires access to some other inputs with/without the output of antecedent pipe. And/or
  • Final output is obtained as an incremental output from each pipe. And/or
  • Pipeline can be described as a workflow. And/or
  • Creating "Out-in" relationship is too burdensome.

In such cases, we need something beyond the Pipes (or MSDN Pipelines). Thus, I propose you two new patterns: River and Rails. Let's look at those.

Second Pattern: River

Let me directly show you the code of River, then we will see how and why it is different than Pipes.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace PipelinePartTwo
{
    public sealed class River<TInput, TOutput>
        where TOutput : new()
    {
        private Action<TInput, TOutput> _currentFlow;
        private int _maxConcurrency;
        private BlockingCollection<TInput> _flowCollection = null;
        private CancellationTokenSource _cancelSource = null;
        private Task _flowProcessor = null;

        public River(Action<TInput, TOutput> currentFlow,
                      int maxConcurrency = -1)
        {
            _currentFlow = currentFlow;
            _maxConcurrency = maxConcurrency;
        }

        public River<TInput, TOutput> Flow(Action<TInput, TOutput> newFlow)
        {
            return new River<TInput, TOutput>((inputValue, outPutValue) =>
            {
                _currentFlow(inputValue, outPutValue);
                newFlow(inputValue, outPutValue);
            }, _maxConcurrency);
        }

        public bool AddValue(TInput inputValue)
        {
            return _flowCollection.TryAdd(inputValue);
        }

        public void StopProcessing(bool waitForProcessing = false)
        {
            _flowCollection.CompleteAdding();
            if (waitForProcessing)
                _flowProcessor.Wait();
        }

        public void AbortProcessing()
        {
            _flowCollection.CompleteAdding();
            _cancelSource.Cancel();
        }

        public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler,
                                    string operationCode = "My_Operation_Name")
        {
            _flowCollection = new BlockingCollection<TInput>();

            _cancelSource = new CancellationTokenSource();
            var option = new ParallelOptions
            {
                MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
                CancellationToken = _cancelSource.Token
            };

            _flowProcessor = Task.Factory.StartNew(() =>
            {
                try
                {
                    Parallel.ForEach(_flowCollection.GetConsumingEnumerable(),
                                     option,
                                     flowInput =>
                                     {
                                         var outPut = new TOutput();
                                         try
                                         {
                                             option.CancellationToken.ThrowIfCancellationRequested();
                                             _currentFlow(flowInput, outPut);
                                         }
                                         catch (Exception e)
                                         {
                                             errorHandler(flowInput,
                                                          outPut,
                                                          "Error occurred inside " +
                                                          operationCode + " pipeline.",
                                                          e);
                                         }
                                     });
                }
                catch (OperationCanceledException)
                {
                }
            });
        }
    }
} 

If you have been following this series, by now, you must have guessed that the major difference between Pipes and this (River) implementation is only a single line of code. Yes, you are right! It is the way how pipeline is constructed inside the function Flow. In Pipes, we were establishing "out-in" relationship, and in River, we are making calls to Action methods in succession.

public River<TInput, TOutput> Flow(Action<TInput, TOutput> newFlow)
{
     return new River<TInput, TOutput>((inputValue, outPutValue) =>
     {
          _currentFlow(inputValue, outPutValue); //First call the current member
          newFlow(inputValue, outPutValue); //Now we will call the new member
     }, _maxConcurrency);
} 

This design gives us the liberty to access information from input/output variables and mutate it along the flow. Trivial constraint "where TOutput : new()" can easily be removed and, instead a wrapper be created as we have done for the implementation of Pipes. Following is a trivial usage example and its output screenshot:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace PipelinePartTwo
{
    class Program
    {
        static void Main(string[] args)
        {
            SomeCode();
            Console.WriteLine();
            SomeCodeWithError();
            Console.ReadLine();
        }
        private static void SomeCode()
        {
            var r = new River<MyInput, MyOutput>(First)
                        .Flow(Second)
                        .Flow(Third)
                        .Flow(Forth)
                        .Flow(FinalHandle);
            r.StartProcessing(ErrorHandler, "MyRiverExample");
            r.AddValue(new MyInput());
            r.StopProcessing(true);
        }
        private static void SomeCodeWithError()
        {
            var r = new River<MyInput, MyOutput>(First)
                        .Flow(Second)
                        .Flow(Third)
                        .Flow(ThrowError)
                        .Flow(Forth)
                        .Flow(FinalHandle);
            r.StartProcessing(ErrorHandler, "MyRiverExampleWithError");
            r.AddValue(new MyInput());
            r.StopProcessing(true);
        }
        static void FinalHandle(MyInput i, MyOutput p)
        {
            Console.WriteLine("MyRiverExample's Output is " + p.CalledCount);
        }
        private static void Forth(MyInput i, MyOutput p)
        {
            Console.WriteLine("Inside Forth: Computing based on MyInput and MyOutput");
            p.CalledCount++;
        }
        private static void Third(MyInput i, MyOutput p)
        {
            Console.WriteLine("Inside Third: Computing based on MyInput and MyOutput");
            p.CalledCount++;
        }
        private static void Second(MyInput i, MyOutput p)
        {
            Console.WriteLine("Inside Second: Computing based on MyInput and MyOutput");
            p.CalledCount++;
        }
        private static void First(MyInput i, MyOutput p)
        {
            Console.WriteLine("Inside First: Computing based on MyInput and MyOutput");
            p.CalledCount++;
        }
        private static void ThrowError(MyInput i, MyOutput p)
        {
            Console.WriteLine("Inside ThrowError: I will throw error");
            throw new Exception("My Exception");
        }
        private static void ErrorHandler<TOne, TTwo>(TOne first, TTwo second, string name, Exception e)
        {
            Console.WriteLine(name + "." + Environment.NewLine +
                "When Input: " + first.ToString() + Environment.NewLine +
                "And Output: " + second.ToString() + Environment.NewLine +
                "Error Details: " + e.Message);
        }
    }
    public class MyOutput
    {
        public int CalledCount = 0;
        //##### All Required OUTPUT Values
        public override string ToString()
        {
            return "MyOutput Count is " + CalledCount;
        }
    }
    public class MyInput
    {
        //##### All Required INPUT Values
        public override string ToString()
        {
            return "I am MyInput";
        }
    }
}

Third Pattern: Rails

Pipes and River both patterns require a dedicated implementation of pipeline instance to process values. Now, imagine if a single instance of Pipeline could fit in all the situations, thus, we could construct a global instance and use it everywhere during app lifetime. I agree it is too much of an imagination, however, the implementation of Rails is based on this idea. As once, the rail (the transport line) is available, any compatible train can run on it; with the same analogy, once the Rails instance is available any compatible pipeline can be processed. In the world of C#, one way to avail the compatibility is through the use of Interface, thus, this pattern is based on a specific (and simple) Interface, named IRail (of course) which can be written as:

public interface IRails
{
    void ProcessData();
    void HandleError(Exception e);
}

Yes, you guessed it right!!! With this pattern, only the following steps are required:

  1. Construct a singleton instance of Rails.
  2. Prepare your classes and implement IRail and place all your computational logic inside ProcessData().
  3. Create and pass instances of these classes as inputs to the pipeline.
  4. ... and that's it!

The implementation of Rails is the following (I have removed the AbortProcessing() as it doesn't make sense for such app-lifetime instances to abort processing, however, you have the right to include it as per your requirements):

public sealed class Rails
{
    private int _maxConcurrency;
    private BlockingCollection<IRail> _dataCollection = null;
    private Task _dataProcessor = null;

    public Rails(int maxConcurrency = -1)
    {
        _maxConcurrency = maxConcurrency;
    }

    public bool AddValue(IRail inputValue)
    {
        return _dataCollection.TryAdd(inputValue);
    }

    public void StopProcessing(bool waitForProcessing = false)
    {
        _dataCollection.CompleteAdding();
        if (waitForProcessing)
            _dataProcessor.Wait();
    }

    public void StartProcessing()
    {
        _dataCollection = new BlockingCollection<IRail>();
        var option = new ParallelOptions{MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency)};
        _dataProcessor = Task.Factory.StartNew(() =>
        {
            Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
                             option,
                             inputData =>
                             {
                                 try
                                 {
                                     inputData.ProcessData();
                                 }
                                 catch (Exception e)
                                 {
                                     inputData.HandleError(e);
                                 }
                             });
        });
    }
}

And following is a trivial usage example of Rails pattern with a sample output screenshot:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace PipelinePartTwo
{
    class Program
    {
        private static readonly Rails Singleton = new Rails();//Creating Instance

        static void Main(string[] args)
        {
            Singleton.StartProcessing(); // This would go to app start method
            Singleton.AddValue(new RailOne()); // This can be called anywhere
            Singleton.AddValue(new RailTwo()); // This can be called anywhere
            Singleton.StopProcessing(true); // This would go to app stop method
            Console.ReadLine();
        }

        public static void HandleError(string persoMess, Exception e)
        {
            Console.WriteLine(persoMess + Environment.NewLine +
                "Error Details: " + e.Message);
        }
    }
    public class RailOne : IRail
    {
        //private MyOverallInput EverythingRequiredAsInput;
        //private MyOverallOutput EverythingRequiredAsOutput;

        public RailOne(/*My Ctor Input*/)
        {
            //Init my inputs
            //Init my outputs
        }
        public void ProcessData()
        {
            ProcessDataOne();
            ProcessDataTwo();
            ProcessDataThree();
        }
        private void ProcessDataOne()
        {
            Console.WriteLine("RailOne: ProcessDataOne");
        }
        private void ProcessDataTwo()
        {
            Console.WriteLine("RailOne: ProcessDataTwo");
        }
        private void ProcessDataThree()
        {
            Console.WriteLine("RailOne: ProcessDataThree");
        }
        public void HandleError(Exception e)
        {
            Program.HandleError("Error Occurred In RailOne", e);
        }
    }
    public class RailTwo : IRail
    {
        //private MyOverallInput EverythingRequiredAsInput;
        //private MyOverallOutput EverythingRequiredAsOutput;

        public RailTwo(/*My Ctor Input*/)
        {
            //Init my inputs
            //Init my outputs
        }
        public void ProcessData()
        {
            ProcessDataOne();
        }
        private void ProcessDataOne()
        {
            Console.WriteLine("RailTwo: ProcessDataOne");
            ProcessDataTwo(true);
        }
        private void ProcessDataTwo(bool error)
        {
            Console.WriteLine("RailTwo: ProcessDataOne throws exception...");
            if(error)
                throw new Exception("RailTwo is in exception");
            ProcessDataThree();
        }
        private void ProcessDataThree()
        {
            Console.WriteLine("RailTwo: ProcessDataThree");
        }
        public void HandleError(Exception e)
        {
            Program.HandleError("Error Occurred In RailTwo", e);
        }
    }
}

Story So Far...

So far, we have seen 3 out of the 4 following patterns:

  1. Pipes: Useful when we have "out-in" relationship among pipeline members. But not useful otherwise.
  2. River: When several (common) pieces of information are required to perform the computation in the pipeline. Members in the pipeline can be easily added/removed (as the requirement changes) as the signature of pipeline members is EXACTLY the same. It can be wise to create a pipeline member for each functional unit.
  3. Rails: Same as River but very useful when multiple types of data needs to be processed. All the computations logic can be wrapped in an IRail class along with required input/output and passed to this pipeline for processing.

Hope you like this series and I promise you that I will bring out the last part very soon.

History

  • V1 of the suggested solution

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here