Introduction
This is the second part of my proposed series of articles on TPL. Last time I introduced Tasks, and covered this ground:
- Thread vs. Tasks
- Creating Tasks
- Trigger Methods/Properties
- Handling Exceptions
- Cancelling Tasks
- SynchronizationContext
This time we are going to be looking at how to use a TPL concept called Continuations. Which is how we might run something after a Task that does something with a return value from a Task.
Article Series Roadmap
This is article 2 of a possible 6, which I hope people will like. Shown below is the rough outline of what I would like to cover:
- Starting Tasks / Trigger Operations / Exception Handling / Cancelling / UI Synchronization
- Continuations / Cancelling Chained Tasks (this article)
- Parallel For / Custom Partitioner / Aggregate Operations
- Parallel LINQ
- Pipelines
- Advanced Scenarios / v.Next for Tasks
I am aware that some folk will simply read this article and state that it is similar to what is currently available on MSDN, and I in part agree with that; however, there are several reasons I have chosen to still take on the task of writing up these articles, which are as follows:
- It will only really be the first couple of articles which show similar ideas to MSDN; after that, I feel the material I will get into will not be on MSDN, and will be the result of some TPL research on my behalf, which I will be outlining in the article(s), so you will benefit from my research which you can just read...aye, nice.
- There will be screenshots of live output here which is something MSDN does not have that much of, which may help some readers to reinforce the article(s) text.
- There may be some readers out here that have never even heard of Task Parallel Library so would not come across it in MSDN, you know the old story, you have to know what you are looking for in the first place thing.
- I enjoy threading articles, so like doing them, so I did them, will do them, have done them, and continue to do them.
All that said, if people having read this article truly think this is too similar to MSDN (which I still hope it won't be), let me know that as well, and I will try and adjust the upcoming articles to make amends.
Table of Contents
Anyway, what I am going to cover in this article is as follows:
Some more TPL background
This section is something I should really have talked about in the first article but I did not, so I am including it here instead. I hope to explain why the designers of TPL did things the way they did and how that benefits us all.
The default task scheduler
TPL relies on a scheduler to organise and run tasks. In .NET 4, the default (which you can swap out) task scheduler is tightly integrated with the ThreadPool
. As such, if you use the default task scheduler, the worker threads that run the Task
s are managed by the ThreadPool
, where generally there are at least as many worker threads as there are cores on the target PC. When there are more Task
s than there are worker threads, some Task
s must be queued, until a ThreadPool
worker thread becomes free to service the Task
.
This is a similar concept to the one deployed by the existing ThreadPool.QueueUserWorkItem(..)
. In fact, you could think of the default task scheduler as an improved ThreadPool
, where the worker items are simply Task
s. The default scheduler is capable of better performance than the standard ThreadPool
alone as the number of cores increases; we shall examine that below.
The standard ThreadPool
The ThreadPool
is basically a global First In First Out (FIFO) queue in which worker items are assigned to do the dequeued work.
This is OK until the number of cores increases, and then it becomes a bottleneck, as the queue can only be accesses by one worker thread at a time. When there are only a few large course grained parallel items to deal with, the synchronization cost of ensuring single access to this global queue is small, but when you have much fine grained parallelism going on (as you would with Task
s), the synchronization costs of working with this single global queue begins to become a bottle neck.
Tasks were always designed to scale to the number of cores available, and I read somewhere that .NET is capable of running efficiently with millions of tasks. To handle that, a different approach had to be taken from the centralized queue. I will talk about this more decentralized approach to scheduling below.
Decentralized local queues
The .NET Framework provides each worker thread from the ThreadPool
with its own local task queue. The local queues distribute the load and alleviate much of the need to use the single global queue. You can see below that there are as many local queues as there are worker threads as well as the single global queue, all of which operate concurrently.
The idea being that a worker thread may take from its local queue in a last in first out (LIFO) approach, where it might find work, or it may have to go back (and incur a heavier synchronization cost for doing so) to the single global queue.
There is one more trick that the TPL designers managed to get into play; if a worker thread local queue is empty, they could go back to the global queue for more, but what the TPL designers did was get it to steal work from its neighboring local queue in FIFO order.
Former MVP and now Microsoft employee Daniel Moth has an excellent post with some highly intuitive diagrams to illustrate all this on his blog post: http://www.danielmoth.com/Blog/New-And-Improved-CLR-4-Thread-Pool-Engine.aspx.
It is well worth reading that post.
Anyway, I am sorry about that slight divergence, I just felt I need to get that out there. OK, so now on to Continuations.
Continuation, what's that
Simply put, continuations allow Task
s to be chained together. While this does not sound like that much of a big deal by itself, what makes the continuation concept really shine is that you can have selective continuations; that is, you could have a continuation that only fires when a whole group of Task
s finish, or a continuation that only fires when one of many Task
s finish, or we could have a continuation that only fires when a Task
fails or is cancelled. Continuations afford us that level of freedom. And by using this freedom offered to us by TPL, we can achieve very fine grain control over many aspects of our parallel code, rather than just one monolithic chunk of threaded code.
In this article, I have purposely designed the task chains to be quite small, but you really can make these chains as small or as large as you see fit.
Simple Continuation
Demo code project: SimpleContinuation
I have not really got too much to say about this small code snippet/demo, apart from to say it is a continuation, and truth be known, that is probably all I need to say, as that really is pretty much all there is to creating and using a continuation. Dead easy, really.
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 2000);
try
{
taskWithFactoryAndState.ContinueWith((ant) =>
{
List<int> result = ant.Result;
foreach (int resultValue in result)
{
Console.WriteLine("Task produced {0}", resultValue);
}
});
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
Console.ReadLine();
And here is a small screenshot, not very exciting I know, it does get better though.
WPF synchronization
Demo code project: WPFDispatcherSynchonizationContext
This is something that I covered in the previous article: UI Synchronization, and more specifically, Synchronization, WPF Synchronization. There is nothing I have changed to that code, but I have included it here again. You should read that article for the basis of what we are trying to solve. This article just included the code to show you that it is a very valid reason to use a TPL continuation, that is to marshal the thread back to a UI control's owning thread. As I say, this code snippet will not make much sense unless you go and read the relevant sections of the first article.
private void btnDoIt_Click(object sender, RoutedEventArgs e)
{
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task taskWithFactoryAndState1 =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 10000, token).ContinueWith(ant =>
{
lstBox.ItemsSource = ant.Result;
}, token, TaskContinuationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext());
}
And here is a screenshot of the demo running:
Continue "WhenAny"
Demo code project: ContinueWhenAny
One requirement when working with parallel programming might be to try and run a group of algorithms over a dataset and use the one that was the best performing. These algorithms could literally be anything from a custom experimental search. I did not have time to write a whole slew of custom experimental search algorithms so rather opted for something a little bit better known: "Sorting Algorithms". I am using examples from a past C# competition winner: http://www.codeproject.com/KB/recipes/SortVisualization.aspx, by Kanasz Robert.
The basic idea here is that I want to wait only until the first algorithm (i.e., hopefully the fastest one) achieves its goal.
I have squirreled away the algorithms into a little helper DLL called ContinueWhen.Common
in the attached VS2010 solution, but as far as getting the concept of only waiting for one of many running tasks for a continuation, this code should be easy enough to understand without needing to see the actual sorting algorithms.
static void Main(string[] args)
{
Random rand = new Random();
List<int> unsortedList = new List<int>();
int numberOfItemsToSort = 5000;
for (int i = 0; i < numberOfItemsToSort; i++)
{
unsortedList.Add(rand.Next(numberOfItemsToSort));
}
Task<SortingTaskResult>[] tasks =
new Task<SortingTaskResult>[3];
tasks[0] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Bubble Sort");
}, unsortedList);
tasks[1] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Selection Sort");
}, unsortedList);
tasks[2] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
List<int> result = SortingAlgorithms.CountingSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Counting Sort");
}, unsortedList);
Task.Factory.ContinueWhenAny(
tasks,
(Task<SortingTaskResult> antecedent) =>
{
Console.WriteLine(antecedent.Result.ToString());
});
Console.ReadLine();
}
The above code shows us creating three Task
s, one for each sorting algorithm, and then a single continuation that waits for any (the first of the three Task
s to finish) of the Task
s. Here is what is produced when the listing above is run:
It can be seen that "Selection Sort" won the race, even though it was not the first Task
to be started; it won as it is a better algorithm than whatever other algorithm happened to have the use of an available CPU core at that time. I only have two CPU cores on the laptop I wrote this test code on, so chances are if I had four CPU cores, the third algorithm may have ended up winning, as on paper it is the better algorithm.
The other interesting thing to note is that because we are waiting for only one of the Task
s in a group (array) to finish, we are only able to use the Result
from the single Task
that we waited on, as shown in this screenshot.
Continue "WhenAll"
Demo code project: ContinueWhenAll
ContinueWhenAll
is an interesting one, I could think of a number of times this would be very useful. You have split up your parallel work, but must wait for all the parts to finish before moving to the next step, or to use the experimental algorithm idea again. We could also imagine that we might be quite interested in seeing the various characteristics about how our custom algorithms are performed, so must wait for them all to complete before continuing.
I have chosen to again use the sorting algorithm, as it is a simple concept, where by the idea is that we want to run various sorting algorithms over an unsorted list, and wait for all the different algorithms to complete before we can continue.
Here is the code that does that:
static void Main(string[] args)
{
Random rand = new Random();
List<int> unsortedList = new List<int>();
int numberOfItemsToSort = 5000;
for (int i = 0; i < numberOfItemsToSort; i++)
{
unsortedList.Add(rand.Next(numberOfItemsToSort));
}
Task<SortingTaskResult>[] tasks =
new Task<SortingTaskResult>[3];
tasks[0] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Bubble Sort");
}, unsortedList);
tasks[1] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Selection Sort");
}, unsortedList);
tasks[2] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
List<int> result = SortingAlgorithms.CountingSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Counting Sort");
}, unsortedList);
Task.Factory.ContinueWhenAll(
tasks,
(antecedents) =>
{
foreach (Task<SortingTaskResult> task in antecedents)
{
Console.WriteLine(task.Result.ToString());
}
});
Console.ReadLine();
}
It can be seen that this time, the continuation only kicks in when all the three sorting Task
s are complete. Here are the results of running this snippet:
The other interesting thing to note is that because we are waiting for all Task
s in a group (array) to finish, we are able to use the Result
from all these Task
s which we waited on, as shown in this screenshot.
Using a Continuation for exception handling
Demo code project: UsingContinuationForExceptionHandling
When I was talking about the different ways of how to handle Task Exceptions in the first article, I also mentioned that there was another technique which I did not show at the time. Well, now is the time to show that other way. It is pretty simple really: we simply use continuations. The idea is that we have a continuation that is run if the antecedent Task
ran to completion, and another if the antecedent Task
was put into the Faulted state.
This is easily achieved using the TaskContinuationOptions
that we can supply when we create a Task continuation. Here is some example code to illustrate what I mean:
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
if (i > 100)
{
InvalidOperationException ex =
new InvalidOperationException("oh no its > 100");
ex.Source = "taskWithFactoryAndState";
throw ex;
}
}
return ints;
}, 2000);
taskWithFactoryAndState.ContinueWith((ant) =>
{
AggregateException aggEx = ant.Exception;
Console.WriteLine("OOOOPS : The Task exited with Exception(s)");
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}, TaskContinuationOptions.OnlyOnFaulted);
taskWithFactoryAndState.ContinueWith((ant) =>
{
List<int> result = ant.Result;
foreach (int resultValue in result)
{
Console.WriteLine("Task produced {0}", resultValue);
}
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Console.ReadLine();
And to show you what happens when we run this lot, here is a demo:
See how only one of the continuations ran, which is the one that should run "OnlyOnFaulted".
Using Continuation as a pipeline
Demo code project: UsingContinuationsAsPipelines
I did mention somewhere at the beginning of this article that you could use continuations to chain tasks together to make them as simple or as complex as you choose. I have not gone nuts or anything, but I have come up with a small example shown below, which is just slightly larger than the examples you have seen so far. What it does illustrate though is the fact that you can quite easily continue a continuation.
static void Main(string[] args)
{
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 10);
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, " +
"which will be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion)
.ContinueWith((ant) =>
{
List<int> parentResult = ant.Result;
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Continuation Task produced Square of {0}",
resultValue);
}
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Console.ReadLine();
}
It is certainly not rocket science this one. All that I am doing is creating an initial Task
that creates and returns a list of numbers. This List<int>
produced by the first Task
is then passed to a continuation where the original result from the original Task
(the antecedent) is printed, and where a new List<int>
is created by obtaining a square of the original Task
(the antecedent) value produced. The result of this continuation is then fed to yet another continuation that prints the results of the squaring continuation Task
(the antecedent to this continuation).
Each of the continuations assumes an ideal world, and will only run if the original Task
of the continuation runs to completion.
Here is a small demo of this one running:
Catching an exception in a Continuation antecedent
Demo code project: CatchExceptionInAntecedent
So we have now seen several examples of using Task
s/continuations, and we have seen that we can use continuations to run when things go to plan, and we can also run Task
s when the original Task
fails to complete its job, but sometimes, we might simply want to have an unspecified continuation that always happens, whether the original Task
completes successfully or not, and have the continuation decide what to do if something is not right about the status of the original Task
.
Here is an example of how we would check for an Exception
in the original Task in the continuation, where we are rethrowing the original Exception
that was provided by the original Task
. Since in this example the continuation rethrows an Exception
, we need to make sure the Exception
it will throw will be observed in some way (I talked about Exception
observing last time, when I talked about trigger methods/properties such as Wait()
/ Result
), as such I Wait()
on the continuation.
Here is the code:
try
{
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
Console.WriteLine("In TaskWithFactoryAndState");
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
ints.Add(i);
if (i == 5)
throw new InvalidOperationException(
"Don't like 5 its vulgar and dirty");
}
return ints;
}, 100);
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
if (ant.Status == TaskStatus.Faulted)
throw ant.Exception.InnerException;
Console.WriteLine("In Continuation, no problems in Antecedent");
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, " +
"which will be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
});
taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
}
}
Console.WriteLine("Finished");
And here is a small demo screenshot of this running. See how we caught the original Exception
of the original Task
, and rethrew successfully (preserving the Exception
information) up to the point where the Exception
got caught by the try/catch
.
Cancelling a Continuation
Demo code project: CancellingContinuations
One of the most obvious things you may want to do with a continuation is cancel it, right? Well luckily, you have already seen all the tricks of the trade for doing this in the first article, remember the CancellationTokenSource
object we looked at last time?
There is not much more to it than that. We create a new CancellationTokenSource
, and pass the CancellationToken
from it to any TPL Task
s/continuations we want to be affected when the CancellationToken
is cancelled.
The same rules you saw in the first article still apply, we must be good and ensure that the Task
/continuation that expect, and make use of a CancellationToken
to throw an Exception
when cancel is requested (remember, this is vital to ensure the Task
transitions to the correct state).
Anyway, I am probably talking too much, when the code speaks for itself. The code itself is simple. We have an original Task
that creates a List<int>
which is then used inside a continuation, where the original Task
numbers are squared/printed and returned. However, 5 seconds after the original Task
is created, the CancellationToken
that was passed to the original Task
and the continuation is cancelled.
CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
try
{
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
Console.WriteLine("In TaskWithFactoryAndState");
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
tokenSource.Token.ThrowIfCancellationRequested();
ints.Add(i);
Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
Thread.Sleep(1000); }
return ints;
}, 10000, tokenSource.Token);
Thread.Sleep(5000);
tokenSource.Cancel();
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
Console.WriteLine("In Continuation");
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, which will " +
"be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
}, tokenSource.Token);
taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
}
}
finally
{
tokenSource.Dispose();
}
Console.WriteLine("Finished");
Console.ReadLine();
Here are the results:
Notice how the continuation did not even kick in at all, and we only created 5 items. That is due to the fact that the CancellationToken
cancel kicked in. It's that easy to cancel a continuation, you have to love this TPL stuff man.
That's it for now
I know this article did not have as much to say as the first, thing is continuations are surprisingly easy to get to grips with, so there was less to say. The next two articles are likely to be of about the same sort of size as this, but the ones after that will have some more meat.
That is all I wanted to say in this article. I hope you liked it and want more. If you did like this article, and would like more, could you spare some time to leave a comment and a vote? Many thanks.
Hopefully, see you at the next one, and the one after that, and the one after that, yes six in total. I better get busy.