This piece is about managing and configuring Dataflow blocks to form data processing pipelines. There are suggestions for the best ways to link blocks together and to configure them for parallel working as well as how to enable the concurrent reading of and exclusive writing to, shared variables.
Introduction
The Dataflow
library consists of a series of thread-safe data buffers known as blocks that are designed to be linked together to form data processing pipelines. Each block processes data in its own timeframe and thread space. The blocks communicate with each other by the sending and receiving data messages. In software modelling terms, the blocks are known as Agents; this sort of model has good separation of concerns and is easily expandable. The interaction between threads is managed by the exchange of messages between dataflow blocks, so there’s no need to disappear down the async
wormhole. The Dataflow
library is available as the NuGet package System.Threading.Tasks.Dataflow
Do They Work?
Before going any further, it’s as well to test out one of the blocks to see if it passes muster. The ActionBlock<T>
is a thread-safe data buffer that uses an Action<T> delegate
to consume data messages. The type T
is the message. ActionBlocks
are designated as target blocks as they are designed to receive messages, they are not intended to be a source of message. Messages posted to the block's input buffer are inputted into the Action delegate
as a parameter. The block’s power derives from its ability to ‘spin up’ numerous worker threads to handle the messages in parallel. Here’s one way of instantiating it.
var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3,
SingleProducerConstrained = true });
In this case, the message is a simple int
that simulates work by causing the delegate’s thread to sleep for a while. ExecutionDataflowBlockOptions
are set to allow three worker threads to run concurrently and the SingleProducerConstrained
flag informs the block that there is only one producer posting to its buffer so that the input does not have to be ‘gated’. Using this setup, a test can be run to see how an ActionBlock
parallel foreach
loop compares to a conventional foreach
loop.
private readonly int[] arr = Enumerable.Repeat(20, 20).ToArray();
[Benchmark(Baseline = true)]
public bool ForEach()
{
foreach (var item in arr)
{
Thread.Sleep(item);
}
return true;
}
[Benchmark]
public bool ParallelForEach()
{
var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 3,
SingleProducerConstrained = true });
foreach (var item in arr)
{
block.Post(item);
}
block.Complete();
block.Completion.Wait();
return true;
}
The results show that the parallel foreach
loop is running 2.9 times more quickly than the standard loop. That’s quite impressive considering that the pipeline is being built and closed down within the test method itself.
A More Practical Application
Honey the Codewitch recently posted an interesting article entitled A Thread Pooling and Task Queuing Demonstration Using Message Passing where concurrent worker threads were configured to report their progress to progress bars displayed in a Windows Forms window. This was achieved by direct instantiation and manipulation of threads. For those of us that don’t have Honey’s excellent thread marshalling skills, the same functionality can be achieved using a couple of ActionBlocks
. The basic set up is the same as for the parallel foreach example, the only difference is that the action delegates now all post their progress to another shared ActionBlock
that’s configured to post its output on the UI thread. The Message
type is expanded to take WorkerCommands
.
public enum WorkerCommands
{
Stop,
Start,
Report
}
public struct Message
{
public int Value;
public int WorkerId;
public WorkerCommands Command;
}
The progress reporter block is configured to run its delegate on the user interface:
var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
_progressReporter = new ActionBlock<Message>(msg => { reportProgress(msg); },
new ExecutionDataflowBlockOptions { TaskScheduler = uiTaskScheduler,
CancellationToken = _token });
The worker block's delegate captures the progress reporter and posts Messages
to it.
int workersCount = Math.Max(Environment.ProcessorCount - 2, 1);
...
_workerBlock = new ActionBlock<Message>(msg => { DoWork(msg); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = workersCount,
SingleProducerConstrained = true, CancellationToken = _token });
....
private void DoWork(Message msg)
{
int id = Thread.CurrentThread.ManagedThreadId;
_progressReporter.Post(new Message() {
Command = WorkerCommands.Start, Value = 0, WorkerId = id
});
try
{
for (var i = 1; i <= 50; ++i)
{
Thread.Sleep(msg.Value);
_progressReporter.Post(new Message() {
Command = WorkerCommands.Report, Value = i * 2, WorkerId = id }
);
_token.ThrowIfCancellationRequested();
}
_progressReporter.Post(new Message() {
Command = WorkerCommands.Stop, Value = 0, WorkerId = id });
}
catch (OperationCanceledException)
{
}
}
The download has an example WPF application that uses dataflow blocks to update ProgressBars
.
A Bit of Nomenclature
Blocks that output messages are called producers or source blocks. Blocks that receive messages are known as consumers or target blocks. Blocks that can be both producers and consumers are propagator blocks. There are designated target blocks, the ActionBlock
is one, but there are no source blocks as such. The term is used with reference to another block. So a propagator block can be referred to as the source block for a specific target block. More formally, source blocks implement the ISourceBlock<TOutput>
interface and target blocks implement ITargetBlock<TInput>
. Propagator blocks implement both interfaces as well as IPropagatorBlock<in TInput, out TOutput>
.
Managing Dataflow Blocks
Once a block is instantiated, it remains active, constantly monitoring its input buffer and processing messages as they arrive. When it is no longer required, it should be closed down in a structured way so that data is not lost in the process. The way to achieve this is to call the block’s Complete
method. That method stops the block from accepting messages and flushes the data buffer so that all stored messages are processed. The Task
returned from the block’s Complete
method is stored in the block’s Completion
field, so the correct way to close down a block is:
block.Complete();
block.Completion.Wait();
Linking Blocks Together
Linking blocks to form a data processing pipeline involves coupling the output of a source block to the input of a target block using the source block LinkTo
method.
sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });
Setting the DataflowLinkOptions PropagateCompletion
flag results in the source block calling the target block’s Complete
method after its own Complete
method has finished. This enables the completion to ripple down the pipeline. So, to close the pipeline down, call Complete
on the first block and wait for the end block to complete.
Encapsulating Blocks to Facilitate Linking
In the previous example, an ActionBlock’s
delegate uses another ActionBlock
to update the UI by posting directly to that block. That’s a design fault, the workerBlock
is too closely tied to the progressBlock
and, in effect, the pipeline is being built internally within the workerBlock
. The workerBlock
should be posting its data to a propagator block, so that the propagator block can use its LinkTo
method to link to the progressBlock
. As the workerBlock
and the propagator block are dependent upon each other, it’s best to encapsulate the two blocks to form a single entity. This is done by calling the Dataflow.Encapsulate
method passing in the workerBlock
as the target and the propagator block as the source. It's important to attach a continuation Task
to the target's completion Task
. The continuation Task
is configured to call the source’s Complete
method so that the encapsulated entity that's returned from the method completes properly. The factory method below instantiates the encapsulated propagator.
public static class MessagePropagatorFactory
{
public static IPropagatorBlock<Message, Message>
CreateMessagePropagator(ExecutionDataflowBlockOptions blockOptions,
CancellationToken token)
{
var source = new BufferBlock<Message>(new DataflowBlockOptions
{ CancellationToken = token
});
var target = new ActionBlock<Message>(msg =>
{
int id = Thread.CurrentThread.ManagedThreadId;
source.Post(new Message() {
Command = WorkerCommands.Start, Value = 0, WorkerId = id
});
try
{
for (var i = 1; i <= 50; ++i)
{
Thread.Sleep(msg.Value);
source.Post(new Message() {
Command = WorkerCommands.Report, Value = i * 2, WorkerId = id
});
token.ThrowIfCancellationRequested();
}
source.Post(new Message() {
Command = WorkerCommands.Stop, Value = 0, WorkerId = id
});
}
catch (OperationCanceledException){}
},
blockOptions);
target.Completion.ContinueWith((t) => source.Complete());
return DataflowBlock.Encapsulate(target, source);
}
}
The factory method uses a BufferBlock
as the source block. The BufferBlock
is a propagator block, multiple blocks can be linked to its output but the first linked block that accepts the message takes it away, it’s not offered to other consumers. Under the covers, there’s a message based handshaking protocol in place between the BufferBlock
and any linked consumers. That protocol enables consumers to accept, reject or postpone any message that they are offered. That ’s one of the reasons why LinkTo
should be used in place of DIY posting and async
reading of messages. Rolling your own dataflow block is a herculean task. It’s nearly always better to use the DataflowBlock.Encapsulate
method or a wrapper class based on existing blocks. With the factory method in place, building the pipeline becomes something like this:
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
var progressOptions= new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3,
SingleProducerConstrained = true,
CancellationToken = token
};
var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator
(progressOptions,token);
var reporterOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };
var progressReporter = new ActionBlock<Message>((msg) =>
Console.WriteLine($"{msg.WorkerId} {msg.Command}"),reporterOptions);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
progressPropagator.LinkTo(progressReporter, linkOptions);
Broadcasting Messages
The BroadcastBlock
sends the same data message to all linked target blocks. Well, it's not exactly the same message, the consumers receive a cloned version of the original message. This is done to prevent problems arising from multiple threads working with the same message instance. The message cloning method is passed as a parameter to the BroadcastBlock’s
constructor. These sorts of blocks are used to split the pipeline into branches. With the use of a BroadcastBlock
, the pipeline in the example can be modified to send messages to the original progressReporter
as well as a stopCommandLogger
as illustrated below:
var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator(options,token);
var executionOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };
var progressReporter = new ActionBlock<Message>((msg) =>
Console.WriteLine($"{msg.WorkerId} {msg.Command}"),executionOptions);
var stopCommandLogger = new ActionBlock<Message>((msg) =>
Console.WriteLine($"Logged {msg.Command}"),executionOptions);
var broadcastBlock = new BroadcastBlock<Message>
(msg => msg, new DataflowBlockOptions { CancellationToken = token });
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
progressPropagator.LinkTo(broadcastBlock, linkOptions);
broadcastBlock.LinkTo(progressReporter,linkOptions);
broadcastBlock.LinkTo(stopCommandLogger, linkOptions,msg=>msg.Command==WorkerCommands.Stop);
The BroadcastBlock
has another important function, it always outputs the lastest message that’s posted to it so it provides a live data feed. A ‘bottle neck’ in the pipeline ahead of the block will not result in the block outputting stale messages. This can be important with time-sensitive data such as stock market prices.
Concurrent Reads and Exclusive Writes
A situation can arise with parallel working where some threads need to read a shared variable when other threads want to write to it. In this sort of case, readers can be allowed to read the variable concurrently with other readers but writers must update the variable exclusively so that it’s not updated at a time when readers are in the middle of processing it. To facilitate this, the Dataflow
library provides a ConcurrentExclusiveSchedulerPair
that contains two schedulers. Readers are set to use ConcurrentScheduler
and writers are set to use the ExclusiveScheduler
. For this to work well, there needs to be load balancing between concurrent threads so that the work is shared out equally between them. The best way to configure this is to use a BufferBlock
as the source and for the linked ActionBlocks’
delegates to return a Task
. Messages will not be offered to busy ActionBlocks
. Here’s the set up for readers, it’s similar for writers but they use the ExclusiveScheduler
.
var readExWriteScheduler = new ConcurrentExclusiveSchedulerPair();
blockOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = 1,
SingleProducerConstrained = true
};
var actionBlock=new ActionBlock<int>(async (i) =>
{
await Task.Factory.StartNew(() =>
readerWork(i),
token,
TaskCreationOptions.None,
readExWriteScheduler.ConcurrentScheduler);
},
blockOptions);
The download has a demo that uses the ConcurrentExclusiveSchedulerPair
with three readers and one writer. Data is sent in batches to illustrate that the worker threads are employed on a while the buffer has data basis - they are released when it’s empty. The display shows the BufferBlock’s
load balancing ability.
Other Dataflow Blocks
There are many types of dataflow blocks that are not covered in this introduction. Most are concerned with increasing the entropy of the data - that’s the amount of information that the data contains. Single items go into the blocks and container loads come out. You can even have an input of pennies and an input of buns and get out a stream of tuples each containing a penny and a bun. Stephen Toub describes them all in this excellent guide.
Conclusion
The Dataflow
library is focused on employing parallel programming for the ‘in house’ processing of data. It does this by splitting the process up into a series of autonomous blocks that run in their own thread space and manage their own threads. They are easy to employ and simple to link together, but some caution is needed here as an increase in concurrency does not always equate to better performance. Some experimentation is recommended in order to find the best design. After all, if your application ends up with more threads than the Bayeux Tapestry, it’s as well to make sure that they are being deployed efficiently.
History
- 11th September, 2020: Initial version