Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Pipes, River, Rails and Binder Patterns - Part I

0.00/5 (No votes)
8 Dec 2014 1  
A look at possible parallel-producer-consumer patterns.

Motivation

This article takes inspiration from Pipelines from MSDN. In this MSDN article, parallel producer-consumer using BlockingCollection is well developed/explained. And here, thus, I am not going to repeat what has been already explained well by field experts. Here, I try to present some tricks, to:

  • simplify parallel programming
  • improve code reusability
  • centralize error-handling

Introduction

This article (along with future parts) present four (4) different ways to construct parallel producer-consumer solution. Taking wikipedia's Software design pattern definition, I call them Patterns (as the title suggests), however, readers will always reserve the rights to agree/disagree with me.

First, consider a definition about "out-in" relationship:

"In the realm of Pipeline, Out-In relationship is defined between two functions where the antecedent function's output is the input to the subsequent function. Then the two functions are called (at least by me) out-in members."

This series will discuss the implementation of the following patterns (I apologize for poorly chosen names):

  1. Pipes: An "out-in" members based simple design. Also, requires Action<TInput, TOutput> delegate(s) as final outcome handler. This pattern I am going to illustrate in this part.
  2. River: An improvement over Pipes which obviates the need of "out-in" relationship. The pipeline consists only Action<TInput, TOutput> delegates and input/output pair flows throughout the pipeline. Hence, no need of final outcome handler. I will come back to this pattern in part - II.
  3. Rails: This pattern is similar to River, however, requires an Interface implementation. The input data is a class and implements the required interface where interface represents the pipeline logic. The advantage of this implementation is that multiple input types can be processed by the same pipeline. Part - II will also contain this pattern along with River.
  4. Binder: An extension over River which decouples pipelines members and provides mean to allocate desired worker threads for each pipeline member. Finally, I will devote part - III on this pattern.

Well, now we can consider each implementation one-by-one with examples. But, I would like to begin with a non-parallel example (please skip this part if you are not interested):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Piper
{
    class Program
    {
        static void Main(string[] args)
        {
            var input = 1;
            SomeCode(input);
            SomeCodeInAnotherWay(input);
            Console.ReadLine();
        }
        static void SomeCode(int input)
        {
            var finalAnswer = PowerIt(DoubleIt(MultiplyIt(RandomizeIt(input))));
            Console.WriteLine("Answer: " + finalAnswer);
        }
        static void SomeCodeInAnotherWay(int input)
        {
            var a1 = RandomizeIt(input);
            var a2 = MultiplyIt(a1);
            var a3 = DoubleIt(a2);
            var finalAnswer = PowerIt(a3);
            Console.WriteLine("Answer: " + finalAnswer);
        }
        private static decimal PowerIt(double p)
        {
            return (decimal)Math.Pow(p, 2.7);
        }
        private static double DoubleIt(long p)
        {
            return Math.Log((double)p);
        }
        private static long MultiplyIt(int p)
        {
            return p * 200;
        }
        private static int RandomizeIt(int p)
        {
            return p + 56;
        }
    }
}

In the above example, both SomeCode and SomeCodeInAnotherWay functions are the same and do produce the same results. The only difference is SomeCodeInAnotherWay is written in multiple lines while SomeCode function is bit more messy in presentation. Anyway, how the code is written is not an issue here, in fact, I want to bring your attention to the following questions:

  • How to maintain readability of this code-snippet while adding more "out-in" members in the chain?
  • What about ErrorHandling? Should we create try/catch for each-and-every "out-in" member or single try/catch for SomeCode (or SomeCodeInAnotherWay)?
  • What if a project has many such "out-in" chains. Is there an easy way to improve code reusability/simplify error handling?
  • I agree, using this function inside Parallel.For (or ForEach) is not that difficult, but, what about creating producer-consumer pattern? Should we re-write it over and over in our solution for each usage?
  • ... and many other similar questions (mmmm... I can't think of anything else!)

Throughout this series of articles, we will consider several patterns which will address one or more of the above mentioned issues. So we begin with our first pattern: Pipes.

First Pattern: Pipes

The idea behind this implementation originates from the UNIX based pipe "|" command. Please see Pipeline (Unix) wiki link for more information. Anyway, without going much in theory, let's see the code itself:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Piper
{
    public sealed class Pipes<TInput, TOutput>
    {
        private BlockingCollection<InputWrapper> 
        _dataCollection = null; // Holds the data to process
        private CancellationTokenSource _cancelSource = null; // For task cancellation
        private Task _pipelineProcessor = null; // The task itself
        private Func<TInput, TOutput> 
        _currentPipe; //Current function of "out-in" relation
        private int _maxConcurrency;
        
        public Pipes(Func<TInput, TOutput> currentPipe,
                        int maxConcurrency = -1)
        {
            _currentPipe = currentPipe;
            _maxConcurrency = maxConcurrency;
        }

        // This function establishes the "out-in" 
        relationship among pipeline functions.
        public Pipes<TInput, TNewPipeOutput> 
        Pipe<TNewPipeOutput>(Func<TOutput, TNewPipeOutput> newPipe)
        {
            // Here we create new function as NewFunction(OldFunction(inputData))
            // REMEMBER : PowerIt(DoubleIt(MultiplyIt(RandomizeIt(input)))) from above
            return new Pipes<TInput, TNewPipeOutput>
            (inputValue => newPipe(_currentPipe(inputValue)), _maxConcurrency);
        }

        // Adding values in pipeline
        public bool AddValue(TInput inputValue, Action<TInput, TOutput> callbackAction)
        {
            return _dataCollection.TryAdd(new InputWrapper
            {
                InputValue = inputValue,
                CallBackFunction = callbackAction
            });
        }

        // Starts the processing
        public void StartProcessing(Action<TInput, string, Exception> errorHandler,
                                    string operationCode = "My_Operation_Name")
        {
            _dataCollection = new BlockingCollection<InputWrapper>();
            _cancelSource = new CancellationTokenSource();

            var option = new ParallelOptions
            {
                MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
                CancellationToken = _cancelSource.Token
            };

            _pipelineProcessor = Task.Factory.StartNew(() =>
            {
                try
                {
                    Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
                        option,
                        inputWrapper =>
                        {
                            try
                            {
                               option.CancellationToken
                                     .ThrowIfCancellationRequested();
                               //Here we just call the Final Action method with
                               //input and outcome of current (LAST) 
                               //pipe in the chain
                               //(which is THIS _currentPipe!)
                               inputWrapper.CallBackFunction(inputWrapper.InputValue,
                                                _currentPipe(inputWrapper.InputValue));
                            }
                        catch (Exception e)
                        {
                            errorHandler(inputWrapper.InputValue,
                                    "Error occurred inside " + 
                                    operationCode + " pipeline.",
                                    e);
                        }
                    });
                }
                catch (OperationCanceledException)
                {
                }
            });
        }

        public void StopProcessing(bool waitForProcessing)
        {
            _dataCollection.CompleteAdding();
            if (waitForProcessing)
                _pipelineProcessor.Wait();
        }

        public void AbortProcessing()
        {
            _cancelSource.Cancel();
            _dataCollection.CompleteAdding();
        }

        // Wrapper class to hold the InputData and Final Action handle pair
        private class InputWrapper
        {
            internal TInput InputValue;
            internal Action<TInput, TOutput> CallBackFunction;
        }
    }
}

Looking at the above implementation, all the MAGIC is happening by only a single line inside the below function:

public Pipes<TInput, TNewPipeOutput> 
Pipe<TNewPipeOutput>(Func<TOutput, TNewPipeOutput> newPipe)
{
     return new Pipes<TInput, TNewPipeOutput>
     (inputValue => newPipe(_currentPipe(inputValue)), _maxConcurrency);
}

In fact, we are just creating a new instance of the Pipes class with newly defined output type, named TNewPipeOutput by creating "out-in" relationship between current and new pipe function. And that's it! If we again do the Pipe on this instance, then again a new "out-in" relationship will be established!!! Now, let me re-implement SomeCode (as shown as an example in Introduction above) function using Pipes.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Piper
{
    class Program
    {
        static void Main(string[] args)
        {
            SomeCode();
            SomeErroredCode();
            Console.ReadLine();
        }
        static void SomeCode()
        {
            var myPipes = new Pipes<int, int>(RandomizeIt, -1)
                                 .Pipe(MultiplyIt) // Add Pipe function
                                 .Pipe(DoubleIt) // More pipe
                                 .Pipe(PowerIt); // And more...
            //Step 1. Start the pipeline
            myPipes.StartProcessing(HandleError, "SomeCode");
            //Step 2. Add Input values with Final Handler
            Parallel.For(1, 5, 
                input => myPipes.AddValue(input, ShowMyAnswer));
            //Step 3. Wait until processing is done!
            //(or call it without waiting)
            //IMPORTANT: Do make a call to this
            //function else the Loop will wait indefinitely!!!
            myPipes.StopProcessing(true);
        }
        static void SomeErroredCode()
        {
            var myPipes = new Pipes<int, int>(RandomizeIt, -1)
                                    .Pipe(MultiplyIt) //Add pipe...
                                    .Pipe(DoubleIt) //Another...
                                    .Pipe(PowerIt) // Another...
//Notice, how easily we can extend the pipeline
//with new functions, without much of code refactoring!!!
//Adding one more pipe without changing anything else!!!
                                    .Pipe(ThrowError); 

            myPipes.StartProcessing(HandleError, "SomeErroredCode");
            Parallel.For(1, 5, input => myPipes.AddValue(input, ShowMyAnswer));
            myPipes.StopProcessing(true);
        }
        private static void ShowMyAnswer<TIn, TOut>(TIn input, TOut output)
        {
            Console.WriteLine("For Input value: " + input +
                ", Output is: " + output +
                ", Executed On Thread Id: " + 
                System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
        private static decimal PowerIt(double p)
        {
            return (decimal)Math.Pow(p, 2.7);
        }
        private static double DoubleIt(long p)
        {
            return Math.Log((double)p);
        }
        private static long MultiplyIt(int p)
        {
            return p * 200;
        }
        private static int RandomizeIt(int input)
        {
            return input + 56;
        }
        private static decimal ThrowError(decimal p)
        {
            throw new Exception("Test Exception");
        }
        private static void HandleError<Tin>(Tin i, string mess, Exception e)
        {
            Console.WriteLine("Error On Thread Id: " + 
            System.Threading.Thread.CurrentThread.ManagedThreadId +
                ", Input was: " + i +
                ", Error In: " + mess +
                ", Message: " + e.Message);
        }
    }
}

From the above example, we notice the following things:

  • Building a pipeline is very simple, once the "out-in" pairs are ready to be piped.
  • The usage of this class requires just 3 steps:
    1. Call to StartProcessing
    2. Adding input values and
    3. Call to StopProcessing
  • Adding new pipes at the tail is extremely easy and doesn't require refactoring (however, removing/altering intermediate pipes may require some refactoring if "out-in" relation changes).
  • All the error handling is done separately and no "out-in" member requires any try/catch.

This design can be further improved on the following points:

  • Error handler of signature HandleError<Tin, Tout>(Tin i, Tout o, string mess, Exception e) can be implemented to establish pipe level error handling (similar to try/catch for each member function). In this case, such error handler can be provided in Pipe() method by replacing pipe lambda by multiline function. I leave this implementation for users (the reason is: a similar implementation has been adopted for River too which I am going to show you in part - II).
  • Other minor improvements like:
    1. Stopping processing on error
    2. Notifying the presence of error to the caller of StopProcessing by returning bool
    3. Returning collection of Input/Output pair to the caller of StopProcessing instead of Action handler, etc. can be implemented. I have personally developed some of these variants as per project requirements.

Following is the snapshot of the output of the above sample code:

IMPORTANT

In case you are building a Pipeline to run for application life-time (by creating singleton class or by other means), consider changing Parallel.ForEach inside StartProcessing method (useful when input data arrives at slower rate than the data processing time) to non-parallel foreach. If it does NOT suit your requirement, please consider GetConsumingPartitioner BlockingCollection extension (an excellent article by Stephen Toub which very well explains the risk of using BlockingCollection in conjunction with Parallel.ForEach).

À Bientôt

Hope you liked this article and hope it would help you during dev some day. Please feel free to leave your comments and share some other patterns you have dev/used. I sincerely apologize to you that I am unable to post all the patterns in this article. In fact, it requires lot of time to prepare code + sample and organize words to communicate well. But, I would try my best to post other parts as soon as possible and keep you informed through announcements. So I would say "À bientôt" (see you soon)!

Pipes, River, Rails and Binder Patterns - Part II >>

History

This is V1 of the proposed implementation.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here