Introduction
.NET TPL Dataflow is built upon existing .NET TPL (Task Parallel Library) and mainly for promoting actor-based programming. In an actor-based model, an actor communicates with other actors or with the outside world by sending and receiving messages. Dataflow seems to be useful when you are dealing with multiple operations which has to communicate with each other or when you are dealing with data that has to be processed exactly when it becomes available.
The Dataflow components we are going to use in our sample are ActionBlock<T>
, TransformBlock<T>
and BroadcastBlock<T>
. All blocks of TPL Dataflow implements IDataflowBlock
interface. Complete()
and Fault()
are the member methods of the IDataflowBlock
and Completion
is the member property. Complete
method is used for successful completion of the task. Fault
method gets called on exceptions and Completion
property returns a Task which will be executed by the Dataflow
block asynchronously.
ITargetBlock
and ISourceBlock
are the interfaces which demand consideration for study as Dataflow
blocks implement them. ITargetBlock
is for receiving messages and ISourceBlock
for sending the messages. ActionBlock
implements ITragetBlock
and is an execution block which calls Action
delegate when it receives a message. TransformBlock
implements both ITargetBlock
and ISourceBlock
and also an execution block. TransformBlock
is capable of both sending and receiving messages and hence accepts a Func
delegate. BroadcastBlock
also implements both ITargetBlock
and ISourceBlock
and is mainly used in broadcasting messages to multiple components where components are interested in most recent value sent.
Jump Start
Let us build a sample WPF application which demonstrates broadcasting of messages using TPL Dataflow
blocks using .NET 4.5 and Visual Studio 2012.
The requirement is building a simple Stock Index Display panel which displays stock index values. Display includes index name, current value, pt. change and % change. Broadcaster will broadcast index, current value and previous value for the stock index. Other display components like pt.change and % change should be calculated as transformations and should be displayed on user choice and when data is available.
I have designed the UI to look as below.
The flow can be as mentioned in the steps below:
- Build a broadcaster (using
BroadcastBlock
) for broadcasting stock indices randomly and asynchronously. - Build an
ActionBlock
to receive the broadcasted message and display in UI (Index and Current Value). - Build a
TransformBlock
to receive broadcasted message and then calculate the pt. change and % change and then send the result as message for UI. - Build an
ActionBlock
to receive message sent from TransformBlock
and display it in UI (Pt. change and % change).
Note: To work with TPL Dataflow
, you need to download and install Microsoft.Tpl.Dataflow
using NuGet. Once you create a new WPF project, open Package Manager Console and run the command Install-Package Microsoft.Tpl.Dataflow. Reference to System.Threading.Tasks.Dataflow
will be added to the project automatically. Dataflow components are found under the namespace System.Threading.Tasks.Dataflow
.
Let us code the application now.
1. Build the BroadCaster
Let's add a class StockIndexBroadCaster
which acts as a broadcaster of stock indices for our sample asynchronously. We are going to add a method BroadCastIndexData()
which returns output of type Task. Let us make the method async. As seen in the code, we have a Task.Delay
for 2 seconds which makes StockIndexBroadCaster
to broadcast messages for every 2 seconds.
Set the BroadcastBlock
of StockIndex
type through constructor and call the SendAsync
method of the BroadcastBlock
to broadcast messages asynchronously. SendAysnc
method returns output of type Task
and hence is awaitable.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace BroadCastingSample
{
public class StockIndexBroadCaster
{
private readonly BroadcastBlock<StockIndex> _stockBroadCaster;
public StockIndexBroadCaster(BroadcastBlock<StockIndex> broadcaster)
{
_stockBroadCaster = broadcaster;
}
public async Task BroadCastIndexData()
{
var random = new Random();
decimal previousval = 1000;
for (int i = 0; i < 100; i++)
{
await Task.Delay(2000);
var stockIndex = new StockIndex()
{
Index = "TestIndex" + (i + 1),
CurrentValue = random.Next(1000, 5000),
PreviousValue = previousval
};
previousval = stockIndex.PreviousValue;
await _stockBroadCaster.SendAsync(stockIndex);
}
}
}
}
Our StockIndex
class will look as below and will be a data carrier.
public class StockIndex
{
public string Index { get; set; }
public decimal CurrentValue { get; set; }
public decimal PreviousValue { get; set; }
}
In MainWindow.xaml.cs, using BroadcastBlock
as below.
private BroadcastBlock<StockIndex> _broadcast;
public MainWindow()
{
InitializeComponent();
}
private void Window_Loaded(object sender, RoutedEventArgs e)
{
LblCurrentValue.Content = string.Empty;
LblIndex.Content = string.Empty;
LblPercentChange.Content = string.Empty;
LblPtChange.Content = string.Empty;
_broadcast = new BroadcastBlock<StockIndex>(f => f); }
2. Consume messages of broadcaster
Make the button click asynchronous and create an ActionBlock
for updating UI. UpdateUiAtStart()
method returns ActionBlock
of StockIndex
which updates the UI as seen below. Link the broadcaster to the UI updater (ActionBlock
) using LinkTo
method of the BroadcastBlock
. Now start broadcasting.
private async void BtnStart_Click(object sender, RoutedEventArgs e)
{
ActionBlock<StockIndex> updateMeAtStart = UpdateUiAtStart();
_broadcast.LinkTo(updateMeAtStart);
var stockBroadCaster = new StockIndexBroadCaster(_broadcast);
await stockBroadCaster.BroadCastIndexData();
}
private ActionBlock<StockIndex> UpdateUiAtStart()
{
return new ActionBlock<StockIndex>(x =>
{
LblCurrentValue.Content = x.CurrentValue.ToString();
LblIndex.Content = x.Index;
},
new ExecutionDataflowBlockOptions()
{
TaskScheduler =
TaskScheduler.FromCurrentSynchronizationContext()
});
}
3. Build the TrasformBlock and update the UI
Create an ActionBlock
of Tuple<string, string>
for updating UI after transformations. UpdateUiAfterTransformation()
returns an ActionBlock
which updates the UI after receiving message from TransformBlock
.
Create a TransformBlock
which accepts Func
delegate as below and link broadcaster to changeCalculator
for receiving broadcasted messages. Now let us link changeCalculator
to updateChanges ActionBlock
for updating the UI after processing the calculations.
private void BtnStartTransofrmations_Click(object sender, RoutedEventArgs e)
{
ActionBlock<Tuple<string, string>> updateChanges = UpdateUiAfterTransformation();
var changeCalculator = new TransformBlock<StockIndex,
Tuple<string, string>>(f =>
{
decimal ptchange = f.CurrentValue - f.PreviousValue;
decimal percentchange =
(f.CurrentValue - f.PreviousValue) * 100 / f.CurrentValue;
return new Tuple<string, string>
(ptchange.ToString(), percentchange.ToString());
}
);
_broadcast.LinkTo(changeCalculator);
changeCalculator.LinkTo(updateChanges);
}
private ActionBlock<Tuple<string, string>> UpdateUiAfterTransformation()
{
return new ActionBlock<Tuple<string, string>>(x =>
{
LblPtChange.Content = x.Item1;
LblPercentChange.Content = x.Item2;
},
new ExecutionDataflowBlockOptions()
{
TaskScheduler =
TaskScheduler.FromCurrentSynchronizationContext()
});
}
Note: More about LinkTo method can be found in this MSDN article.
Conclusion
We are able to build a broadcasting model using TPL Dataflow
. It seems that building parallel dataflows using TPL Dataflow
does not seem to be complicated and you can keep the code tidy.
Points of Interest
Exception handling and customizing dataflow blocks.
History
-
18th January, 2014: Initial post