Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

An Introduction to Parallel Computation and the building of data pipelines with Dataflow Blocks

5.00/5 (5 votes)
11 Sep 2020CPOL8 min read 7.9K   131  
How to take advantage of the multi-cored architecture of modern processors by using Dataflow blocks
This piece is about managing and configuring Dataflow blocks to form data processing pipelines. There are suggestions for the best ways to link blocks together and to configure them for parallel working as well as how to enable the concurrent reading of and exclusive writing to, shared variables.

Introduction

The Dataflow library consists of a series of thread-safe data buffers known as blocks that are designed to be linked together to form data processing pipelines. Each block processes data in its own timeframe and thread space. The blocks communicate with each other by the sending and receiving data messages. In software modelling terms, the blocks are known as Agents; this sort of model has good separation of concerns and is easily expandable. The interaction between threads is managed by the exchange of messages between dataflow blocks, so there’s no need to disappear down the async wormhole. The Dataflow library is available as the NuGet package System.Threading.Tasks.Dataflow

Do They Work?

Before going any further, it’s as well to test out one of the blocks to see if it passes muster. The ActionBlock<T> is a thread-safe data buffer that uses an Action<T> delegate to consume data messages. The type T is the message. ActionBlocks are designated as target blocks as they are designed to receive messages, they are not intended to be a source of message. Messages posted to the block's input buffer are inputted into the Action delegate as a parameter. The block’s power derives from its ability to ‘spin up’ numerous worker threads to handle the messages in parallel. Here’s one way of instantiating it.

C#
var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3,
            SingleProducerConstrained = true });

In this case, the message is a simple int that simulates work by causing the delegate’s thread to sleep for a while. ExecutionDataflowBlockOptions are set to allow three worker threads to run concurrently and the SingleProducerConstrained flag informs the block that there is only one producer posting to its buffer so that the input does not have to be ‘gated’. Using this setup, a test can be run to see how an ActionBlock parallel foreach loop compares to a conventional foreach loop.

C#
private readonly int[] arr = Enumerable.Repeat(20, 20).ToArray();

  [Benchmark(Baseline = true)]
  public bool ForEach()
  {
      foreach (var item in arr)
      {
          Thread.Sleep(item);
       }
      //keep Benchmark happy - it likes a test method to return something
      return true;
  }
  [Benchmark]
  public bool ParallelForEach()
  {
      var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
                 new ExecutionDataflowBlockOptions {
                 MaxDegreeOfParallelism = 3,
                 SingleProducerConstrained = true });
      foreach (var item in arr)
      {
          block.Post(item);
      }
      block.Complete();         //close the pipeline
      block.Completion.Wait();  //wait for it to complete
      return true;
  }

Image 1

The results show that the parallel foreach loop is running 2.9 times more quickly than the standard loop. That’s quite impressive considering that the pipeline is being built and closed down within the test method itself.

A More Practical Application

Honey the Codewitch recently posted an interesting article entitled A Thread Pooling and Task Queuing Demonstration Using Message Passing where concurrent worker threads were configured to report their progress to progress bars displayed in a Windows Forms window. This was achieved by direct instantiation and manipulation of threads. For those of us that don’t have Honey’s excellent thread marshalling skills, the same functionality can be achieved using a couple of ActionBlocks. The basic set up is the same as for the parallel foreach example, the only difference is that the action delegates now all post their progress to another shared ActionBlock that’s configured to post its output on the UI thread. The Message type is expanded to take WorkerCommands.

C#
public enum WorkerCommands
  {
      Stop,
      Start,
      Report
  }

  public struct Message
  {
      public int Value;
      public int WorkerId;
      public WorkerCommands Command;
  }

The progress reporter block is configured to run its delegate on the user interface:

C#
var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
 _progressReporter = new ActionBlock<Message>(msg => { reportProgress(msg); },
                     new ExecutionDataflowBlockOptions { TaskScheduler = uiTaskScheduler, 
                     CancellationToken = _token }); 

The worker block's delegate captures the progress reporter and posts Messages to it.

C#
  int workersCount = Math.Max(Environment.ProcessorCount - 2, 1);
...
   _workerBlock = new ActionBlock<Message>(msg => { DoWork(msg); },
                  new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = workersCount,
                  SingleProducerConstrained = true, CancellationToken = _token });
....
private void DoWork(Message msg)
        {
            //the same threads are reused while the workerBlock's buffer holds data.
            //The threads are released when it is  empty
            int id = Thread.CurrentThread.ManagedThreadId;
            //send start command
            _progressReporter.Post(new Message() {
            Command = WorkerCommands.Start, Value = 0, WorkerId = id 
            });
            try
            {
                for (var i = 1; i <= 50; ++i)
                {
                    Thread.Sleep(msg.Value);
                    //report progress, Value is the progress %
                    _progressReporter.Post(new Message() { 
                    Command = WorkerCommands.Report, Value = i * 2, WorkerId = id }
                    );
                    _token.ThrowIfCancellationRequested();
                }
                //send the stop command
                _progressReporter.Post(new Message() { 
                 Command = WorkerCommands.Stop, Value = 0, WorkerId = id });
            }
            catch (OperationCanceledException)
            {

            }
        }

The download has an example WPF application that uses dataflow blocks to update ProgressBars.

Image 2

A Bit of Nomenclature

Blocks that output messages are called producers or source blocks. Blocks that receive messages are known as consumers or target blocks. Blocks that can be both producers and consumers are propagator blocks. There are designated target blocks, the ActionBlock is one, but there are no source blocks as such. The term is used with reference to another block. So a propagator block can be referred to as the source block for a specific target block. More formally, source blocks implement the ISourceBlock<TOutput> interface and target blocks implement ITargetBlock<TInput>. Propagator blocks implement both interfaces as well as IPropagatorBlock<in TInput, out TOutput>.

Managing Dataflow Blocks

Once a block is instantiated, it remains active, constantly monitoring its input buffer and processing messages as they arrive. When it is no longer required, it should be closed down in a structured way so that data is not lost in the process. The way to achieve this is to call the block’s Complete method. That method stops the block from accepting messages and flushes the data buffer so that all stored messages are processed. The Task returned from the block’s Complete method is stored in the block’s Completion field, so the correct way to close down a block is:

C#
block.Complete();         //close the pipeline
block.Completion.Wait();  //wait for it to complete

Linking Blocks Together

Linking blocks to form a data processing pipeline involves coupling the output of a source block to the input of a target block using the source block LinkTo method.

C#
sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

Setting the DataflowLinkOptions PropagateCompletion flag results in the source block calling the target block’s Complete method after its own Complete method has finished. This enables the completion to ripple down the pipeline. So, to close the pipeline down, call Complete on the first block and wait for the end block to complete.

Encapsulating Blocks to Facilitate Linking

In the previous example, an ActionBlock’s delegate uses another ActionBlock to update the UI by posting directly to that block. That’s a design fault, the workerBlock is too closely tied to the progressBlock and, in effect, the pipeline is being built internally within the workerBlock. The workerBlock should be posting its data to a propagator block, so that the propagator block can use its LinkTo method to link to the progressBlock. As the workerBlock and the propagator block are dependent upon each other, it’s best to encapsulate the two blocks to form a single entity. This is done by calling the Dataflow.Encapsulate method passing in the workerBlock as the target and the propagator block as the source. It's important to attach a continuation Task to the target's completion Task. The continuation Task is configured to call the source’s Complete method so that the encapsulated entity that's returned from the method completes properly. The factory method below instantiates the encapsulated propagator.

C#
public static class MessagePropagatorFactory
{
  public static IPropagatorBlock<Message, Message>
  CreateMessagePropagator(ExecutionDataflowBlockOptions blockOptions,
                         CancellationToken token)
  {
      //source is the output block. A BufferBlock can be both a source and a target
      var source = new BufferBlock<Message>(new DataflowBlockOptions
                                           { CancellationToken = token
      });
      //target is the input block
      var target = new ActionBlock<Message>(msg =>
      {
          //worker threads are released when the data buffer is  empty
          int id = Thread.CurrentThread.ManagedThreadId;
          //send start command
          source.Post(new Message() {
          Command = WorkerCommands.Start, Value = 0, WorkerId = id
          });
          try
          {
              for (var i = 1; i <= 50; ++i)
              {
                  Thread.Sleep(msg.Value);
                  //report progress, Value is the progress %
                  source.Post(new Message() {
                  Command = WorkerCommands.Report, Value = i * 2, WorkerId = id
                  });
                  token.ThrowIfCancellationRequested();
              }
              //send the stop command
              source.Post(new Message() {
              Command = WorkerCommands.Stop, Value = 0, WorkerId = id
              });
          }
          catch (OperationCanceledException){}
      },
       blockOptions);
      // When the target completes, it sets its Completion Task to a completed state
      // So, when that happens, call the source's Complete method.
      target.Completion.ContinueWith((t) => source.Complete());
      //the returned entity implements the IPropagatorBlock interface
      return DataflowBlock.Encapsulate(target, source);
  }
}

The factory method uses a BufferBlock as the source block. The BufferBlock is a propagator block, multiple blocks can be linked to its output but the first linked block that accepts the message takes it away, it’s not offered to other consumers. Under the covers, there’s a message based handshaking protocol in place between the BufferBlock and any linked consumers. That protocol enables consumers to accept, reject or postpone any message that they are offered. That ’s one of the reasons why LinkTo should be used in place of DIY posting and async reading of messages. Rolling your own dataflow block is a herculean task. It’s nearly always better to use the DataflowBlock.Encapsulate method or a wrapper class based on existing blocks. With the factory method in place, building the pipeline becomes something like this:

C#
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
var progressOptions= new ExecutionDataflowBlockOptions
 {
  MaxDegreeOfParallelism = 3,
  SingleProducerConstrained = true,
  CancellationToken = token
 };

 var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator
                          (progressOptions,token);

 var reporterOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };

 var progressReporter = new ActionBlock<Message>((msg) =>
         Console.WriteLine($"{msg.WorkerId} {msg.Command}"),reporterOptions);

 var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
 progressPropagator.LinkTo(progressReporter, linkOptions);

Broadcasting Messages

The BroadcastBlock sends the same data message to all linked target blocks. Well, it's not exactly the same message, the consumers receive a cloned version of the original message. This is done to prevent problems arising from multiple threads working with the same message instance. The message cloning method is passed as a parameter to the BroadcastBlock’s constructor. These sorts of blocks are used to split the pipeline into branches. With the use of a BroadcastBlock, the pipeline in the example can be modified to send messages to the original progressReporter as well as a stopCommandLogger as illustrated below:

C#
var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator(options,token);

var executionOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };
var progressReporter = new ActionBlock<Message>((msg) =>
Console.WriteLine($"{msg.WorkerId} {msg.Command}"),executionOptions);

var stopCommandLogger = new ActionBlock<Message>((msg) =>
Console.WriteLine($"Logged {msg.Command}"),executionOptions);

//msg=>msg is the cloning function, no sort of conversion is need in this case
var broadcastBlock = new BroadcastBlock<Message>
(msg => msg, new DataflowBlockOptions { CancellationToken = token });

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
progressPropagator.LinkTo(broadcastBlock, linkOptions);
broadcastBlock.LinkTo(progressReporter,linkOptions);
//pass in a Predicate so that the logger gets only messages where the Command is Stop.
broadcastBlock.LinkTo(stopCommandLogger, linkOptions,msg=>msg.Command==WorkerCommands.Stop);

The BroadcastBlock has another important function, it always outputs the lastest message that’s posted to it so it provides a live data feed. A ‘bottle neck’ in the pipeline ahead of the block will not result in the block outputting stale messages. This can be important with time-sensitive data such as stock market prices.

Concurrent Reads and Exclusive Writes

A situation can arise with parallel working where some threads need to read a shared variable when other threads want to write to it. In this sort of case, readers can be allowed to read the variable concurrently with other readers but writers must update the variable exclusively so that it’s not updated at a time when readers are in the middle of processing it. To facilitate this, the Dataflow library provides a ConcurrentExclusiveSchedulerPair that contains two schedulers. Readers are set to use ConcurrentScheduler and writers are set to use the ExclusiveScheduler. For this to work well, there needs to be load balancing between concurrent threads so that the work is shared out equally between them. The best way to configure this is to use a BufferBlock as the source and for the linked ActionBlocks’ delegates to return a Task. Messages will not be offered to busy ActionBlocks. Here’s the set up for readers, it’s similar for writers but they use the ExclusiveScheduler.

C#
var readExWriteScheduler = new ConcurrentExclusiveSchedulerPair();
blockOptions = new ExecutionDataflowBlockOptions
   {
     CancellationToken = token,
     //Set the buffer capacity to 1 or else the block will swallow everything that's offered
     BoundedCapacity = 1,
     //set this as it avoids having to'gate'the input when there is only 1 active producer
     SingleProducerConstrained = true
   };
   
var actionBlock=new ActionBlock<int>(async (i) =>
   {
    //Use Task.Factory.Start.New() as need to specify the Scheduler.
    //Do not await anything inside the Action delegate passed to Start.New
    //If you do, the continuation will not run on the correct thread
    await Task.Factory.StartNew(() =>
    readerWork(i),
    token,
    TaskCreationOptions.None,
    readExWriteScheduler.ConcurrentScheduler);
   },
   blockOptions);

The download has a demo that uses the ConcurrentExclusiveSchedulerPair with three readers and one writer. Data is sent in batches to illustrate that the worker threads are employed on a while the buffer has data basis - they are released when it’s empty. The display shows the BufferBlock’s load balancing ability.

Image 3

Other Dataflow Blocks

There are many types of dataflow blocks that are not covered in this introduction. Most are concerned with increasing the entropy of the data - that’s the amount of information that the data contains. Single items go into the blocks and container loads come out. You can even have an input of pennies and an input of buns and get out a stream of tuples each containing a penny and a bun. Stephen Toub describes them all in this excellent guide.

Conclusion

The Dataflow library is focused on employing parallel programming for the ‘in house’ processing of data. It does this by splitting the process up into a series of autonomous blocks that run in their own thread space and manage their own threads. They are easy to employ and simple to link together, but some caution is needed here as an increase in concurrency does not always equate to better performance. Some experimentation is recommended in order to find the best design. After all, if your application ends up with more threads than the Bayeux Tapestry, it’s as well to make sure that they are being deployed efficiently.

History

  • 11th September, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)