Introduction
First, let me state right up front that I know there is a Dataflow Framework that is part of the Task Parallel Library. I drew some inspiration from it while creating my own framework that I am going to present.
Background
I stumbled on the Dataflow Framework one day and decided to do a proof of concept. I did this to understand what it is, how to use it and what the pros and cons were in order to determine if I might be able to use it in my current project.
The Dataflow Framework documentation states the following:
- "This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks."
- "These dataflow components are useful when you have multiple operations that must communicate with one another asynchronously or when you want to process data as it becomes available."
This pipeline can be thought of as a enhancement to the Builder Pattern. Individual steps are sequenced in order to create a pipeline which will produce an output or perform overall operation.
As an example, the pipeline I've created using my own framework will:
- Input several lists of numbers that range from 1 to 200
- Transform each list of numbers into individual numbers
- Determine if the number is Even/Odd, convert it to a
string
and filter to an appropriate output - All the Even strings are batched together and all the Odd strings are batched together into a list of
string
s. After collecting 9 in the list, the list is then passed on - The batched Even or Odd outputs are reformed into a single pipeline
- The final step in the process is to output the list of text as a single block
While this pipeline is simplistic and rather trivial, it shows what you can do with the framework. Both frameworks will take care of the overall functionality of moving the data around and handling multithreading each step. This allows you to focus on writing only that which you really care about. What you choose to do in each of your steps is up to you.
I should point out that while I did draw inspiration from the original framework, they are completely different. In some instances, I used exact or similar name for classes and other times, completely different. So the real question is why create a framework when one already exists?
- Size - There are 2 different Nuget packages to choose from. The second one is more recently supported as indicated by the date, however it comes with a hefty install and has replacements for some of the common DLLs.
Microsoft.Tpl.Dataflow
(3.01M) v4.5.24 12/10/2014 System.Threading.Tasks.Dataflow
(12.4M) v4.8.0 08/11/2017
- Company directive - Some companies and industries will not allow 3rd party code or Nuget packages regardless of who made it. So you need to roll your own.
- Sometimes simpler is better.
- Satisfaction - I like taking things apart and putting them back together. This helps me understand how things work.
Using My Framework Code
Building Blocks
Base Class
ExecutionBlock
- This is an abstract
class in which the other blocks are derived from.
- Exceptions - storage area if generated
- Execution State - Enumeration
- Source - Incoming data storage that the method can operate on
- Execution Task - A TPL Task that can be accessed for a variety of reasons
- Miscellaneous operations on code completion, cancelling and waiting
- Parallel Options - options on how to parallelize your code if requested
Each block has an incoming data collection. If you don't provide your own custom one, I will provide one for you. The internal collection is based on the BlockingCollection which has several features around producer/consumer concurrency control as well as features that limit the amount of data collected before blocking as well as don't accept any more data. I've provided a custom implementation (AnotherSource
) which is still based on the BlockingCollection
but shows everything that is needed to make it work. It is up to the reader to understand and create their own version if required.
Blocks
The following pre-defined blocks for the most part have a reference to the next step in the pipeline and a method/function that you provide. In addition, there is a section of code which defines how to get data from the source, pass it to the method, conditionally output it to the next step in the pipeline and handle cancelling of the job or any exceptions that are generated. The next stop in the pipeline will be referred to as the Target going forward.
TransformBlock
- This can transform data from one type to another or could be used to do something like if the data passes some validation routine, it goes on to the target.
- Function - Single input and single output - You define both
TransformToManyBlock
- This is the same as the Transform block with the one difference that it will potentially output multiple objects. In my example, I have an input of List<int>
and output (n)umber of individual integers
- Method - 2 inputs and no output - The inputs are your data type and a reference to the target (
ITarget<T>
) which is the next step in the pipeline.
FilterBlock
- Optionally transform incoming data and send to the target. In my example, I send even numbers to one pipeline and odd to another. This block allows you to add as many filters as you would like. I've made the restriction that all of the targets must have the same parameter type to keep the implementation simple and also made the conscious decision to pass all data through all filters. This was intentional as another filter could have been added for prime numbers in which some numbers would be both odd and prime. This could be a performance issue if you have a lot of filters and only need to filter it to one location. If this is your case, you might want to create a ChainOfResponsibilityBlock
and provide the logic you are looking for in the Execute
method.
- Pairs of Method and Reference to the target - Both the code and target can be different for each filter
BatchInputBlock
- This is the opposite of TransformToManyBlock
. This will collect individual items into a list of items and then pass it on. You get to decide on the number of items to hold into the list. This could be used to collect the data before you bulk load data into a database.
SourceJoinBlock
- In the event that you would like to bring 2 or more pipelines back into a single pipeline moving forward, you will need to use this class. This properly handles the moving of data from all of the source block into the single target. You can think of this as a funnel. This is very necessary in order to prevent the target from closing its input stream prematurely which will lead to exceptions being thrown.
- Source - (N)umber of pipelines that will be sending data.
TerminatorBlock
- This is the block at the end of the pipeline. This block is special because the data will not be passed on to any more blocks.
Block Execution Example
This is an example of an Execute
method contained within the blocks in my framework. While this section of code came from the TransformToManyBlock
, each of the other blocks are equally simplistic. They contain:
- a loop to process the incoming data
- a way to identify when there is no more incoming data
- a parallel statement to process the data according to your needs
- a call to your custom code
- exception handling
- a statement that tells the next block there is no more data coming
protected override void Execute()
{
State = ExecutionState.Running;
while (State == ExecutionState.Running)
{
try
{
if (Source.Count == 0 && Source.IsCompleted)
{
State = ExecutionState.Done;
continue;
}
Parallel.ForEach(Source.GetConsumingEnumerable(), ParallelOptions, item =>
{
if (item == null)
return;
Method(item, Target);
});
}
catch (OperationCanceledException)
{
State = ExecutionState.Cancel;
}
catch (Exception ex)
{
State = ExecutionState.Error;
Exceptions.Add(ex.GetBaseException());
ParallelOptions.Cancel();
}
}
Target.CompleteAdding();
}
Sample Pipeline
public SimplePipeline()
{
_blocks.Add(_step1 = new TransformToManyBlock<List<int>, int>(MethodStep1));
_blocks.Add(_step2 = new FilterBlock<int, string>());
_blocks.Add(_step3A = new BatchInputBlock<string>(9));
_blocks.Add(_step3B = new BatchInputBlock<string>(9));
_blocks.Add(_step4 = new SourceJoinBlock<List<string>>());
_blocks.Add(_step5 = new ActionBlock<List<string>>(MethodStep6));
var cancellationSource = new CancellationTokenSource();
_step1.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(6);
_step2.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(3);
_step3A.ParallelOptions.SetCancellationSource(cancellationSource);
_step3B.ParallelOptions.SetCancellationSource(cancellationSource);
_step4.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(1);
_step5.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(1);
_step1.LinkTo(_step2);
_step2.LinkTo(_step3A, FilterMethod2A);
_step2.LinkTo(_step3B, FilterMethod2B);
_step3A.LinkTo(_step4);
_step3B.LinkTo(_step4);
_step4.LinkTo(_step5);
_step4.AddSource(_step3A);
_step4.AddSource(_step3B);
}
Custom Code for Required Steps
static private void MethodStep1(List<int> list, ITarget<int> target)
{
list?.ForEach(i => target?.TryAdd(i));
}
static private void FilterMethod2A(int value, ITarget<string> target)
{
Task.Delay(50 + Random1.Next(100)).Wait();
if (value % 2 != 0)
return;
Task.Delay(50 + Random1.Next(100)).Wait();
target.TryAdd($"EVEN - {value}");
}
static private void FilterMethod2B(int value, ITarget<string> target)
{
Task.Delay(50 + Random1.Next(100)).Wait();
if (value == 13131313)
{
Console.WriteLine($"Throwing Exception - {value} is unlucky\n");
throw new System.IO.InvalidDataException($"{value} is unlucky");
}
if (value % 2 == 0)
return;
Task.Delay(50 + Random2.Next(100)).Wait();
target.TryAdd($" ODD - {value}");
}
static private void MethodStep6(List<string> items)
{
var len = items.Count;
if (len <= 0)
return;
var sb = new StringBuilder(1000);
sb.AppendLine("==============================");
for (var i = 0; i < len; i++)
sb.AppendLine($"{i + 1} - {items[i]}");
Console.WriteLine(sb.ToString());
}
Closing Remarks
As I have stated before, this was an exercise in creating a proof of concept to understand how data flows from one step to the next. As such, I didn't do any performance testing with either framework. If you look at mine, there really isn't a lot of overhead. Most of the issues would come with the way the custom code is written or by setting the parallelism inappropriately.
I've also included a sample application using the TPL Dataflow Framework so you can compare and contrast the differences between them. I found creating the pipeline with the TPL Dataflow Framework to be frustrating at times because:
- I couldn't always make it do exactly what I was interested in (i.e., taking a list of items and creating individual items)
- Had trouble creating the method call
JoinBlock
seems odd that I have to process both at the same instead of being called with item1
or item2
Take a stroll through the code and feel free to ask questions but know that I consider this the end and don't plan on adding or enhancing the code as I consider my objectives met.
The code is made fully available to you to hack up or enhance in any way that you see fit.
Enjoy!