Introduction
This is the third part of my proposed series of articles on TPL. Last time I introduced Continuations, and covered this ground:
- Some more TPL background
- Continuation, what's that
- Simple continuation
- WPF synchronization
- Continue "WhenAny"
- Continue "WhenAll"
- Using a Continuation for exception handling
- Using Continuation as pipeline
- Catching exception in Continuation antecedent
- Cancelling a Continuation
This time we are going to be looking at how to use Parallel for/foreach loops. We shall also be looking at how to do the usual TPL like things such as Cancelling and dealing with Exceptions, and we shall also look at how to break out of parallel loops, and how to use Thread Local Storage within loops, and how to break out of loops.
Article Series Roadmap
This is article 3 of a possible 6, which I hope people will like. Shown below is the rough outline of what I would like to cover:
- Starting Tasks / Trigger Operations / Exception Handling / Cancelling / UI Synchronization
- Continuations / Cancelling Chained Tasks
- Parallel For / Custom Partitioner / Aggregate Operations (this article)
- Parallel LINQ
- Pipelines
- Advanced Scenarios / v.Next for Tasks
Now, I am aware that some folk will simply read this article and state that it is similar to what is currently available on MSDN, and I in part agree with that; however, there are several reasons I have chosen to still take on the task of writing up these articles, which are as follows:
- It will only really be the first couple of articles which show similar ideas to MSDN; after that, I feel the material I will get into will not be on MSDN, and will be the result of some TPL research on my behalf, which I will be outlining in the article(s), so you will benefit from my research which you can just read...Aye, nice.
- There will be screenshots of live output here, which is something MSDN does not have that much of, which may help some readers to reinforce the article(s) text.
- There may be some readers out here that have never even heard of Task Parallel Library so would not come across it in MSDN, you know the old story, you have to know what you are looking for in the 1st place thing.
- I enjoy threading articles, so like doing them, so I did them, will do them, have done them, and continue to do them.
All that said, if people having read this article, truly think this is too similar to MSDN (which I still hope it won't be), let me know that as well, and I will try and adjust the upcoming articles to make amends.
Table of Contents
What I am going to cover in this article is as follows:
Parallel For/Foreach
A lot of us probably write a lot of sequential code like this:
foreach(SomeObject x in ListOfSomeObjects)
{
x.DoSomething();
}
Where we are doing something to each of the items in some source, and there is no relationship at all between the objects in the source list, we simply want something to happen to every item in some source collection of these objects. The fact that we want to do something to all these objects and the fact that they are not tightly coupled to each other makes this sort of thing an ideal candidate for parallelism, and the designers of TPL thought so too, so they came up with the ability to create Parallel.For
and Parallel.Foreach
loops.
The rest of this article will look at ways in which you can use Parallel.For
and Parallel.Foreach
loops in your own code. Obviously, since we are dealing with parallelism, there are a few added complications, but overall, it's still pretty easy to follow.
Creating a Simple Parallel For/Foreach
Demo solution project: SimpleParallel
Let's start by creating a dead simple Parallel.For
and Parallel.Foreach
loop; here is an example of each:
List<string> data = new List<string>()
{ "There","were","many","animals",
"at","the","zoo"};
Parallel.For(0, 10, (x) =>
{
Console.WriteLine(x);
});
Parallel.ForEach(data, (x) =>
{
Console.WriteLine(x);
});
Console.ReadLine();
It does not get any easier than that. To prove it works as expected, here are the results, the green is the Parallel.For
and the brown is the Parallel.Foreach
.
One thing to note is that the Parallel.For
and Parallel.Foreach
loops do not guarantee correct ordering at all, as can be seen from the figure above.
Breaking and Stopping a Parallel Loop
Demo solution project: BreakingAndStopping
We have all probably written some code like this in the past, where we do something in a serial loop and break out under some condition:
foreach(SomeClass x in ListofClasses)
{
if(x.isBest)
{
x.Save();
break;
}
}
Question is, can you also do this sort of thing when using Parallel.For
and Parallel.Foreach
loops? Well, yes you can, we just need to use a TPL class called ParallelLoopState
, which we can use using one of the many overloads of TPL's Parallel.For
and Parallel.Foreach
.
By using ParallelLoopState
, we can use two loop control methods: Stop()
and Break()
.
ParallelLoopState.Stop()
Communicates that the System.Threading.Tasks.Parallel
loop should cease execution at the system's earliest convenience.
ParallelLoopState.Break()
Communicates that the System.Threading.Tasks.Parallel
loop should cease execution at the system's earliest convenience of iterations beyond the current iteration.
We can also get details about how the Parallel.For
or Parallel.Foreach
loop's work proceeded using the ParallelLoopResult
struct, which we can use as a return value for a TPL Parallel.For
and Parallel.Foreach
loop. Usage of the TPL ParallelLoopResult
struct to examine the status of a TPL Parallel.For
and Parallel.Foreach
loop is also shown in the following three examples:
Here are some demos:
For loop Stop()
ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
{
if (x < 5)
Console.WriteLine(x);
else
state.Stop();
});
Console.WriteLine("For loop LowestBreak Iteration : {0}",
res1.LowestBreakIteration);
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");
Foreach loop Stop()
ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
{
if (!x.Equals("zoo"))
Console.WriteLine(x);
else
state.Stop();
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}",
res2.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");
Foreach loop Break()
ParallelLoopResult res3 = Parallel.ForEach(data, (x, state) =>
{
if (x.Equals("zoo"))
{
Console.WriteLine(x);
state.Break();
}
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}",
res3.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res3.IsCompleted);
And here is a screenshot of this running. See how the first two (the ParallelLoopState.Stop()
ones) do not provide a ParallelLoopResult.LowestBreakIteration
when queried. This is due to the fact that they used ParallelLoopState.Stop()
and therefore did not actually break.
Handling Exceptions
Demo solution project: HandlingExceptions
We covered general Task Exception handling in article 1, and you can use any of those techniques; however, I find by far the easiest is to use familiar constructs that we all use in non-asynchronous code, i.e.: try/catch
, where we simply catch the TPL AggegateException
, where the pattern is something like this:
try
{
.....
.....
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
That's the basic pattern, so here is some actual exception handling for real Parallel.For
and Parallel.Foreach
loops:
Parallel.For loop exception handling
try
{
ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
{
if (!state.IsExceptional)
{
if (x < 5)
Console.WriteLine(x);
else
throw new InvalidOperationException("Don't like nums > 5");
}
});
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
Parallel.Foreach loop exception handling
try
{
ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
{
if (!x.Equals("zoo"))
Console.WriteLine(x);
else
throw new InvalidOperationException("Found Zoo throwing Exception");
});
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
And to show you how it is all running, here is a screenshot:
It can be seen that we got two AggregateException
s thrown by the Parallel.For
loop; this is due to both iterations that threw the silly throw new InvalidOperationException("Don't like nums > 5")
already scheduled, and therefore run, which caused two AggregateException
s to occur and be caught.
Cancelling a Parallel Loop
Demo solution project: CancellingLoops
We covered general Task cancellation in article 1, and the idea is still pretty much the same one you will see crop up again and again when using TPL. How do you cancel a parallel operation? Use a CancellationToken
, that's how.
From what we now know from the two preceding articles, we know that we should catch AggregateException
and we should typically use a section of code within our parallel code, something like this:
token.CancellationToken.ThrowIfCancellationRequested();
Which is what we have been doing until now. Unfortunately, this is not what we need to do when working with Parallel.For
and Parallel.Foreach
loops, and will lead to an unhandled "OperationCancelledException
", as shown in the figure below:
So what must we do? Well, it is quite simple in this particular case (i.e., cancellation). We must make sure to provide a catch handler specifically for a OperationCancelledException
, and it seems to make no odds whether we use the line:
token.CancellationToken.ThrowIfCancellationRequested();
or not. The Parallel.For
and Parallel.Foreach
loops always end up throwing a OperationCancelledException
when their respective CancellationToken
is cancelled. I guess it is down to personal preference whether you include that line or not, at the end of the day. Since it seems to make no difference as to whether a OperationCancelledException
is thrown or not, I am choosing not to include that section of code in the Parallel.For
and Parallel.Foreach
loop demos.
Here is a full listing of a Parallel.For
and Parallel.Foreach
loop, both of which get cancelled by the same CancellationToken
, after 5 seconds.
List<string> data = new List<string>()
{ "There", "were", "many", "animals",
"at", "the", "zoo" };
CancellationTokenSource tokenSource = new CancellationTokenSource();
Task cancelTask = Task.Factory.StartNew(() =>
{
Thread.Sleep(500);
tokenSource.Cancel();
});
ParallelOptions options = new ParallelOptions()
{
CancellationToken = tokenSource.Token
};
try
{
ParallelLoopResult res1 = Parallel.For(0, 1000, options, (x, state) =>
{
if (x % 10 == 0)
Console.WriteLine(x);
Thread.Sleep(100);
});
Console.WriteLine("For loop LowestBreak Iteration : {0}",
res1.LowestBreakIteration);
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
Console.WriteLine("Operation Cancelled");
}
try
{
ParallelLoopResult res2 = Parallel.ForEach(data,options, (x, state) =>
{
Console.WriteLine(x);
Thread.Sleep(100);
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}",
res2.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
Console.WriteLine("Operation Cancelled");
}
Console.ReadLine();
And here is the output:
We do indeed see our cancellation messages, and nothing was scheduled from the Parallel.Foreach
.
However, this looks weird, right?
I would draw your attention to this...Note how we only got the for
loop to print 10 and 500 (OK, it was designed by me to only print if the index % (modulus) 10 ==0, so there is no real way to tell what went on), but this does show that ordering is certainly not guaranteed by Parallel.For
loops, and you should not rely on any ordering at all, and to do so would spell certain doom.
TPL will schedule Parallel.For
delegates onto a worker thread as and when it sees fit, as shown above. We may have got other indexes called, but all we really know is that we got index 0,500 called before we cancelled. What about 100,200,300.... the order is not maintained, so please do not expect any ordering to be maintained by TPL, that will not happen.
That said, it does nicely demonstrate that the Parallel.Foreach
work did not get scheduled at all, as we see no output from it above at all. This may vary dependant on how many cores your PC has, mine only has 2, so that is the output I saw.
Partitioning for Better Performance
Demo solution project: OrderedPartitioner
I don't know how many of you have been eagle eyed enough to notice that when we run a Task
/Continuation or a Parallel.For/Parallel.Foreach
loop, we are effectively queuing up a delegate of work that will eventually be run on a ThreadPool
worker thread. The thing with that is that what can happen, especially in the case of Parallel.For/Parallel.Foreach
loops, with small delegate bodies, is that the amount of time taken to create/swap out these small delegate payloads can have an adverse affect on performance.
Question is, is there anything we can do about it? Well yes, TPL does allow us to create our own partitioning chunks, where the idea is that the overall Parallel.For/Parallel.Foreach
loop workload is broken up into chunks that are of a size that we specify in our own code. We are effectively creating a custom partitioning algorithm. If we go with the default Partitioner
, it uses a default partitioning algorithm that takes into account the number of cores amongst other things, and may not give the best results.
To demonstrate this, I have included three examples below:
- No partitioning at all
- Use the default TPL partitioning algorithm (my laptop has 2 cores)
- Use our own custom partitioning logic
Here is the full code listing to demonstrate these three scenarios. The idea is that I use a simple Task
that is used to fill an array with some dummy data, and then each of the three scenarios outlined above is run (and timed) in turn. Each of the three scenarios perform the same job. It gets an item from the original dummy list and squares it, and writes that value to another results array. By making sure all the three scenarios do exactly the same thing, we should be able to get a realistic comparison.
At the end of the block under test, we also verify that all the elements were hit by checking the result array for 0. 0 should only be present when the result data array is reinitialised at the start of each of these three scenarios. We are basically ensuring each element from the original list got hit by the scenario.
I am just using the standard Threading
synchronization primitives (a ManualResetEventSlim
) to control the running of the three scenarios, only allowing one to run at a time, and they run in the order in which they are declared in the source code file.
int[] inputData = new int[50000000];
int[] resultData = new int[50000000];
ManualResetEventSlim mre = new ManualResetEventSlim(false);
Random rand = new Random();
Object locker = new Object();
Task setupTask = Task.Factory.StartNew(() =>
{
Parallel.For(0, inputData.Length, (i) =>
{
lock (locker)
{
inputData[i] = rand.Next(2,10);
}
});
mre.Set();
});
mre.Wait();
mre.Reset();
Task timerTask1 = Task.Factory.StartNew(() =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
Parallel.ForEach(inputData, (int item, ParallelLoopState loopState, long index) =>
{
resultData[index] = item * item;
});
watch.Stop();
Console.WriteLine("time ellapsed for No partitioning at all version {0}ms",
watch.ElapsedMilliseconds);
Console.WriteLine("Number of results with 0 as square : {0}, " +
"which proves each element was hit\r\n",
(from x in resultData where x == 0 select x).Count()); mre.Set();
mre.Set();
});
mre.Wait();
mre.Reset();
Parallel.For(0, inputData.Length, (i) =>
{
resultData[i] = 0;
});
Task timerTask2 = Task.Factory.StartNew(() =>
{
Stopwatch watch = new Stopwatch();
OrderablePartitioner<int> op = Partitioner.Create(inputData);
watch.Reset();
watch.Start();
Parallel.ForEach(op, (int item, ParallelLoopState loopState, long index) =>
{
resultData[index] = item * item;
});
watch.Stop();
Console.WriteLine("time ellapsed for default TPL partitioning algorithm version {0}ms",
watch.ElapsedMilliseconds);
Console.WriteLine("Number of results with 0 as square : {0}, " +
"which proves each element was hit\r\n",
(from x in resultData where x == 0 select x).Count()); mre.Set();
mre.Set();
});
mre.Wait();
mre.Reset();
Parallel.For(0, inputData.Length, (i) =>
{
resultData[i] = 0;
});
Task timerTask3 = Task.Factory.StartNew(() =>
{
Stopwatch watch = new Stopwatch();
watch.Reset();
watch.Start();
OrderablePartitioner<Tuple<int, int>> chunkPart =
Partitioner.Create(0, inputData.Length, 5000);
Parallel.ForEach(chunkPart, chunkRange =>
{
for (int i = chunkRange.Item1; i < chunkRange.Item2; i++)
{
resultData[i] = inputData[i] * inputData[i];
}
});
watch.Stop();
Console.WriteLine("time ellapsed for custom Partitioner version {0}ms",
watch.ElapsedMilliseconds);
Console.WriteLine("Number of results with 0 as square : {0}, " +
"which proves each element was hit\r\n",
(from x in resultData where x == 0 select x).Count()); mre.Set();
mre.Set();
});
mre.Wait();
mre.Reset();
Console.ReadLine();
And here is the result of running this code (again my laptop has 2 cores, so your results may differ):
I think the results speak for themselves. Using no partitioning is OK, but then we use the default partitioner, and things get worse (more than likely due to my laptop only having 2 cores amongst other factors), but look what happens when we take full control over what partition size to use, that gave us the best performance of all.
So there you go, food for thought.
Thread Local Storage
Sometimes what we need to do inside a loop is accumulate some sort of running total, something like this in sequential code:
int count =0;
int count =0;
foreach(SomeObject object in SomeObjectList)
{
if(object.Contains(someKeyWord)
{
count++
}
}
So how would we do something like this using Parallel.For/Parallel.Foreach
loops? The answer lies in using ThreadLocalStorage
, which has been around for a while, but TPL puts a new spin on it.
Luckily, when constructing Parallel.For/Parallel.Foreach
loops, we have all the relevant overloads we need to use ThreadLocalStorage
, it's just a matter of understanding how to do so. To this end, I have provided two examples, which are as follows:
Demo solution project: ThreadLocalStorage
This example searches a source list of words for a particular search word, and keeps a count of how many times that word is found. It is important to know that when we access the count variable, we are effectively accessing a shared object that needs thread synchronization of some sort. I have chosen to use lock(..
) (which is really using Monitor.TryEnter()/Monitor.Exit()
behind the scenes), but you could use any threading synchronization primitive that you like.
int matches = 0;
object syncLock = new object();
string[] words
= new string[] { "the","other","day","I","was",
"speaking","to","a","man","about",
"this","and","that","cat","and","he","told","me",
"the","only","other","person","to","ask","him",
"about","the","crazy","cat","was","the","cats","owner"};
string searchWord = "cat";
Parallel.ForEach(
words,
() => 0,
(string item, ParallelLoopState loopState, int tlsValue) =>
{
if (item.ToLower().Equals(searchWord))
{
tlsValue++;
}
return tlsValue;
},
tlsValue =>
{
lock (syncLock)
{
matches += tlsValue;
}
});
Console.WriteLine("Matches for searchword '{0}' : {1}\r\n", searchWord, matches);
Console.WriteLine("Where the original word list was : \r\n\r\n{0}",
words.Aggregate((x, y) => x.ToString() + " " + y.ToString()));
Console.ReadLine();
The format of the loop is pretty much dictated by TPL if you want to use ThreadLocalStorage
, it is just a pattern that you will have to learn and follow. This is what the method signature and comments look like within .NET 4.0 itself:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source,
Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal,
TLocal> body, Action<TLocal> localFinally);
It looks a bit hairy, but I think the code is somehow easier to understand than this TPL metadata.
Here is what this code looks like when run:
Demo solution project: ThreadLocalStorage2
This example initializes a source list with random double
values, and then adds these input values up to form a single output value. Again, it does this using ThreadLocalStorage
, and as such, we are again accessing a shared variable that needs thread synchronization of some sort. Again, I have chosen to use lock
(..). The other thing to note is that since the data type I am dealing with in this demo is a double
, I must use the generic available on the Parallel.For<T>
which is double
in this case.
double total = 0;
object syncLock = new object();
double[] values = new double[100];
Random rand = new Random();
Object locker = new Object();
ManualResetEventSlim mre = new ManualResetEventSlim();
Task initialiserTask = Task.Factory.StartNew(() =>
{
Parallel.For(0, values.Length, (idx) =>
{
lock (locker)
{
values[idx] = rand.NextDouble();
}
});
mre.Set();
});
mre.Wait();
Parallel.For<double>(
0,
values.Length,
() => 0,
(int index, ParallelLoopState loopState, double tlsValue) =>
{
tlsValue += values[index];
return tlsValue;
},
tlsValue =>
{
lock (syncLock)
{
total += tlsValue;
}
});
Console.WriteLine("Total: {0}", total);
Console.WriteLine("Press enter to finish");
Console.ReadLine();
And here is a screenshot of it all running:
That's it for Now
That is all I wanted to say in this article. I hope you liked it, and want more. If you did like this article, and would like more, could you spare some time to leave a comment and a vote? Many thanks.
Hopefully, see you at the next one, and the one after that, and the one after that, yes 6 in total. I better get busy.