Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Task Parallel Library: 3 of n

0.00/5 (No votes)
10 May 2011 1  
A look into using the Task Parallel Library.

Introduction

This is the third part of my proposed series of articles on TPL. Last time I introduced Continuations, and covered this ground:

  • Some more TPL background
  • Continuation, what's that
  • Simple continuation
  • WPF synchronization
  • Continue "WhenAny"
  • Continue "WhenAll"
  • Using a Continuation for exception handling
  • Using Continuation as pipeline
  • Catching exception in Continuation antecedent
  • Cancelling a Continuation

This time we are going to be looking at how to use Parallel for/foreach loops. We shall also be looking at how to do the usual TPL like things such as Cancelling and dealing with Exceptions, and we shall also look at how to break out of parallel loops, and how to use Thread Local Storage within loops, and how to break out of loops.

Article Series Roadmap

This is article 3 of a possible 6, which I hope people will like. Shown below is the rough outline of what I would like to cover:

  1. Starting Tasks / Trigger Operations / Exception Handling / Cancelling / UI Synchronization
  2. Continuations / Cancelling Chained Tasks
  3. Parallel For / Custom Partitioner / Aggregate Operations (this article)
  4. Parallel LINQ
  5. Pipelines
  6. Advanced Scenarios / v.Next for Tasks

Now, I am aware that some folk will simply read this article and state that it is similar to what is currently available on MSDN, and I in part agree with that; however, there are several reasons I have chosen to still take on the task of writing up these articles, which are as follows:

  • It will only really be the first couple of articles which show similar ideas to MSDN; after that, I feel the material I will get into will not be on MSDN, and will be the result of some TPL research on my behalf, which I will be outlining in the article(s), so you will benefit from my research which you can just read...Aye, nice.
  • There will be screenshots of live output here, which is something MSDN does not have that much of, which may help some readers to reinforce the article(s) text.
  • There may be some readers out here that have never even heard of Task Parallel Library so would not come across it in MSDN, you know the old story, you have to know what you are looking for in the 1st place thing.
  • I enjoy threading articles, so like doing them, so I did them, will do them, have done them, and continue to do them.

All that said, if people having read this article, truly think this is too similar to MSDN (which I still hope it won't be), let me know that as well, and I will try and adjust the upcoming articles to make amends.

Table of Contents

What I am going to cover in this article is as follows:

Parallel For/Foreach

A lot of us probably write a lot of sequential code like this:

foreach(SomeObject x in ListOfSomeObjects)
{
   x.DoSomething();
}

Where we are doing something to each of the items in some source, and there is no relationship at all between the objects in the source list, we simply want something to happen to every item in some source collection of these objects. The fact that we want to do something to all these objects and the fact that they are not tightly coupled to each other makes this sort of thing an ideal candidate for parallelism, and the designers of TPL thought so too, so they came up with the ability to create Parallel.For and Parallel.Foreach loops.

The rest of this article will look at ways in which you can use Parallel.For and Parallel.Foreach loops in your own code. Obviously, since we are dealing with parallelism, there are a few added complications, but overall, it's still pretty easy to follow.

Creating a Simple Parallel For/Foreach

Demo solution project: SimpleParallel

Let's start by creating a dead simple Parallel.For and Parallel.Foreach loop; here is an example of each:

List<string> data = new List<string>() 
    { "There","were","many","animals",
      "at","the","zoo"};

//parallel for
Parallel.For(0, 10, (x) =>
{
    Console.WriteLine(x);
});

//parallel for each
Parallel.ForEach(data, (x) =>
{
    Console.WriteLine(x);
});

Console.ReadLine();

It does not get any easier than that. To prove it works as expected, here are the results, the green is the Parallel.For and the brown is the Parallel.Foreach.

One thing to note is that the Parallel.For and Parallel.Foreach loops do not guarantee correct ordering at all, as can be seen from the figure above.

Breaking and Stopping a Parallel Loop

Demo solution project: BreakingAndStopping

We have all probably written some code like this in the past, where we do something in a serial loop and break out under some condition:

foreach(SomeClass x in ListofClasses)
{
   if(x.isBest)
   {
      x.Save();
      break;
   }
}

Question is, can you also do this sort of thing when using Parallel.For and Parallel.Foreach loops? Well, yes you can, we just need to use a TPL class called ParallelLoopState, which we can use using one of the many overloads of TPL's Parallel.For and Parallel.Foreach.

By using ParallelLoopState, we can use two loop control methods: Stop() and Break().

ParallelLoopState.Stop()

Communicates that the System.Threading.Tasks.Parallel loop should cease execution at the system's earliest convenience.

ParallelLoopState.Break()

Communicates that the System.Threading.Tasks.Parallel loop should cease execution at the system's earliest convenience of iterations beyond the current iteration.

We can also get details about how the Parallel.For or Parallel.Foreach loop's work proceeded using the ParallelLoopResult struct, which we can use as a return value for a TPL Parallel.For and Parallel.Foreach loop. Usage of the TPL ParallelLoopResult struct to examine the status of a TPL Parallel.For and Parallel.Foreach loop is also shown in the following three examples:

Here are some demos:

For loop Stop()

//parallel for stop
ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
{
    if (x < 5)
        Console.WriteLine(x);
    else
        state.Stop();
});

Console.WriteLine("For loop LowestBreak Iteration : {0}", 
                  res1.LowestBreakIteration);
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");

Foreach loop Stop()

//parallel foreach stop
ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
{
    if (!x.Equals("zoo"))
        Console.WriteLine(x);
    else
        state.Stop();
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}", 
                  res2.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");

Foreach loop Break()

//parallel for each that actuaally breaks, rather than stops
ParallelLoopResult res3 = Parallel.ForEach(data, (x, state) =>
{
    if (x.Equals("zoo"))
    {
        Console.WriteLine(x);
        state.Break();
    }
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}", 
                  res3.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res3.IsCompleted);

And here is a screenshot of this running. See how the first two (the ParallelLoopState.Stop() ones) do not provide a ParallelLoopResult.LowestBreakIteration when queried. This is due to the fact that they used ParallelLoopState.Stop() and therefore did not actually break.

Handling Exceptions

Demo solution project: HandlingExceptions

We covered general Task Exception handling in article 1, and you can use any of those techniques; however, I find by far the easiest is to use familiar constructs that we all use in non-asynchronous code, i.e.: try/catch, where we simply catch the TPL AggegateException, where the pattern is something like this:

try
{
    .....
    .....

}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}

That's the basic pattern, so here is some actual exception handling for real Parallel.For and Parallel.Foreach loops:

Parallel.For loop exception handling

//parallel for Exception handling
try
{
    ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
    {
        if (!state.IsExceptional)
        {
            if (x < 5)
                Console.WriteLine(x);
            else
                throw new InvalidOperationException("Don't like nums > 5");
        }
    });
    Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}

Parallel.Foreach loop exception handling

//parallel foreach Exception handling
try
{
    ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
    {
        if (!x.Equals("zoo"))
            Console.WriteLine(x);
        else
            throw new InvalidOperationException("Found Zoo throwing Exception");
    });
    Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}

And to show you how it is all running, here is a screenshot:

It can be seen that we got two AggregateExceptions thrown by the Parallel.For loop; this is due to both iterations that threw the silly throw new InvalidOperationException("Don't like nums > 5") already scheduled, and therefore run, which caused two AggregateExceptions to occur and be caught.

Cancelling a Parallel Loop

Demo solution project: CancellingLoops

We covered general Task cancellation in article 1, and the idea is still pretty much the same one you will see crop up again and again when using TPL. How do you cancel a parallel operation? Use a CancellationToken, that's how.

From what we now know from the two preceding articles, we know that we should catch AggregateException and we should typically use a section of code within our parallel code, something like this:

token.CancellationToken.ThrowIfCancellationRequested();

Which is what we have been doing until now. Unfortunately, this is not what we need to do when working with Parallel.For and Parallel.Foreach loops, and will lead to an unhandled "OperationCancelledException", as shown in the figure below:

So what must we do? Well, it is quite simple in this particular case (i.e., cancellation). We must make sure to provide a catch handler specifically for a OperationCancelledException, and it seems to make no odds whether we use the line:

token.CancellationToken.ThrowIfCancellationRequested();

or not. The Parallel.For and Parallel.Foreach loops always end up throwing a OperationCancelledException when their respective CancellationToken is cancelled. I guess it is down to personal preference whether you include that line or not, at the end of the day. Since it seems to make no difference as to whether a OperationCancelledException is thrown or not, I am choosing not to include that section of code in the Parallel.For and Parallel.Foreach loop demos.

Here is a full listing of a Parallel.For and Parallel.Foreach loop, both of which get cancelled by the same CancellationToken, after 5 seconds.

List<string> data = new List<string>() 
    { "There", "were", "many", "animals", 
      "at", "the", "zoo" };

CancellationTokenSource tokenSource = new CancellationTokenSource();

Task cancelTask = Task.Factory.StartNew(() =>
{
    Thread.Sleep(500);
    tokenSource.Cancel();
});

ParallelOptions options = new ParallelOptions()
{
    CancellationToken = tokenSource.Token
};

//parallel for cancellation
try
{
    ParallelLoopResult res1 = Parallel.For(0, 1000, options, (x, state) =>
    {
        if (x % 10 == 0)
            Console.WriteLine(x);

        Thread.Sleep(100);
    });

    Console.WriteLine("For loop LowestBreak Iteration : {0}", 
                      res1.LowestBreakIteration);
    Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
    Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
    Console.WriteLine("Operation Cancelled");
}

//parallel foreach cancellation
try
{
    ParallelLoopResult res2 = Parallel.ForEach(data,options, (x, state) =>
    {
        Console.WriteLine(x);
        Thread.Sleep(100);
    });
    Console.WriteLine("Foreach loop LowestBreak Iteration : {0}", 
                      res2.LowestBreakIteration);
    Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
    Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
    Console.WriteLine("Operation Cancelled");
}

Console.ReadLine();

And here is the output:

We do indeed see our cancellation messages, and nothing was scheduled from the Parallel.Foreach.

However, this looks weird, right?

I would draw your attention to this...Note how we only got the for loop to print 10 and 500 (OK, it was designed by me to only print if the index % (modulus) 10 ==0, so there is no real way to tell what went on), but this does show that ordering is certainly not guaranteed by Parallel.For loops, and you should not rely on any ordering at all, and to do so would spell certain doom.

TPL will schedule Parallel.For delegates onto a worker thread as and when it sees fit, as shown above. We may have got other indexes called, but all we really know is that we got index 0,500 called before we cancelled. What about 100,200,300.... the order is not maintained, so please do not expect any ordering to be maintained by TPL, that will not happen.

That said, it does nicely demonstrate that the Parallel.Foreach work did not get scheduled at all, as we see no output from it above at all. This may vary dependant on how many cores your PC has, mine only has 2, so that is the output I saw.

Partitioning for Better Performance

Demo solution project: OrderedPartitioner

I don't know how many of you have been eagle eyed enough to notice that when we run a Task/Continuation or a Parallel.For/Parallel.Foreach loop, we are effectively queuing up a delegate of work that will eventually be run on a ThreadPool worker thread. The thing with that is that what can happen, especially in the case of Parallel.For/Parallel.Foreach loops, with small delegate bodies, is that the amount of time taken to create/swap out these small delegate payloads can have an adverse affect on performance.

Question is, is there anything we can do about it? Well yes, TPL does allow us to create our own partitioning chunks, where the idea is that the overall Parallel.For/Parallel.Foreach loop workload is broken up into chunks that are of a size that we specify in our own code. We are effectively creating a custom partitioning algorithm. If we go with the default Partitioner, it uses a default partitioning algorithm that takes into account the number of cores amongst other things, and may not give the best results.

To demonstrate this, I have included three examples below:

  1. No partitioning at all
  2. Use the default TPL partitioning algorithm (my laptop has 2 cores)
  3. Use our own custom partitioning logic

Here is the full code listing to demonstrate these three scenarios. The idea is that I use a simple Task that is used to fill an array with some dummy data, and then each of the three scenarios outlined above is run (and timed) in turn. Each of the three scenarios perform the same job. It gets an item from the original dummy list and squares it, and writes that value to another results array. By making sure all the three scenarios do exactly the same thing, we should be able to get a realistic comparison.

At the end of the block under test, we also verify that all the elements were hit by checking the result array for 0. 0 should only be present when the result data array is reinitialised at the start of each of these three scenarios. We are basically ensuring each element from the original list got hit by the scenario.

I am just using the standard Threading synchronization primitives (a ManualResetEventSlim) to control the running of the three scenarios, only allowing one to run at a time, and they run in the order in which they are declared in the source code file.

// create the results array
int[] inputData = new int[50000000];
int[] resultData = new int[50000000];
ManualResetEventSlim mre = new ManualResetEventSlim(false);
Random rand = new Random();
Object locker = new Object();

//create some dummy data
Task setupTask = Task.Factory.StartNew(() =>
{
    Parallel.For(0, inputData.Length, (i) =>
    {
        lock (locker)
        {
            inputData[i] = rand.Next(2,10);
        }
    });
    mre.Set();
});



//***********************************************************************************
//
//   SCENARIO 1 : No partitioning at all
//
//***********************************************************************************
mre.Wait();
mre.Reset();

Task timerTask1 = Task.Factory.StartNew(() =>
{
    Stopwatch watch = new Stopwatch();
    watch.Start();
    Parallel.ForEach(inputData, (int item, ParallelLoopState loopState, long index) =>
    {
        resultData[index] = item * item; 
    });
    watch.Stop();
    Console.WriteLine("time ellapsed for No partitioning at all version {0}ms", 
                      watch.ElapsedMilliseconds);
    Console.WriteLine("Number of results with 0 as square : {0}, " + 
             "which proves each element was hit\r\n",
            (from x in resultData where x == 0 select x).Count()); mre.Set();
    mre.Set();
});


//****************************************************************
//
//   SCENARIO 2 : Use the default TPL partitioning algorithm
//               (affected by your PCs # of cores)
//
//****************************************************************
mre.Wait();
mre.Reset();

//clear results
Parallel.For(0, inputData.Length, (i) =>
{
    resultData[i] = 0;
});

Task timerTask2 = Task.Factory.StartNew(() =>
{
    // create an orderable partitioner
    Stopwatch watch = new Stopwatch();
    OrderablePartitioner<int> op = Partitioner.Create(inputData);
    watch.Reset();
    watch.Start();
    Parallel.ForEach(op, (int item, ParallelLoopState loopState, long index) =>
    {
        resultData[index] = item * item;
    });
    watch.Stop();
    Console.WriteLine("time ellapsed for default TPL partitioning algorithm version {0}ms", 
                      watch.ElapsedMilliseconds);
    Console.WriteLine("Number of results with 0 as square : {0}, " + 
         "which proves each element was hit\r\n",
        (from x in resultData where x == 0 select x).Count()); mre.Set();
    mre.Set();
});


//***********************************************************************************
//
//   SCENARIO 3 : Use our own custom partitioning logic
//
//***********************************************************************************
mre.Wait();
mre.Reset();

//clear results
Parallel.For(0, inputData.Length, (i) =>
{
    resultData[i] = 0;
});

Task timerTask3 = Task.Factory.StartNew(() =>
{
    // create an chunking orderable partitioner
    Stopwatch watch = new Stopwatch();
    watch.Reset();
    watch.Start();
    OrderablePartitioner<Tuple<int, int>> chunkPart = 
             Partitioner.Create(0, inputData.Length, 5000);
    Parallel.ForEach(chunkPart, chunkRange =>
    {
        for (int i = chunkRange.Item1; i < chunkRange.Item2; i++)
        {
            resultData[i] = inputData[i] * inputData[i];
        }
    });
    watch.Stop();
    Console.WriteLine("time ellapsed for custom Partitioner version {0}ms", 
                      watch.ElapsedMilliseconds);
    Console.WriteLine("Number of results with 0 as square : {0}, " + 
         "which proves each element was hit\r\n",
        (from x in resultData where x == 0 select x).Count()); mre.Set();
    mre.Set();
});

mre.Wait();
mre.Reset();

Console.ReadLine();

And here is the result of running this code (again my laptop has 2 cores, so your results may differ):

I think the results speak for themselves. Using no partitioning is OK, but then we use the default partitioner, and things get worse (more than likely due to my laptop only having 2 cores amongst other factors), but look what happens when we take full control over what partition size to use, that gave us the best performance of all.

So there you go, food for thought.

Thread Local Storage

Sometimes what we need to do inside a loop is accumulate some sort of running total, something like this in sequential code:

int count =0;
int count =0;

foreach(SomeObject object in SomeObjectList)
{
    if(object.Contains(someKeyWord)
    {
        count++
    }
}

So how would we do something like this using Parallel.For/Parallel.Foreach loops? The answer lies in using ThreadLocalStorage, which has been around for a while, but TPL puts a new spin on it.

Luckily, when constructing Parallel.For/Parallel.Foreach loops, we have all the relevant overloads we need to use ThreadLocalStorage, it's just a matter of understanding how to do so. To this end, I have provided two examples, which are as follows:

Demo solution project: ThreadLocalStorage

This example searches a source list of words for a particular search word, and keeps a count of how many times that word is found. It is important to know that when we access the count variable, we are effectively accessing a shared object that needs thread synchronization of some sort. I have chosen to use lock(..) (which is really using Monitor.TryEnter()/Monitor.Exit() behind the scenes), but you could use any threading synchronization primitive that you like.

int matches = 0;
object syncLock = new object();

string[] words
    = new string[] { "the","other","day","I","was",
    "speaking","to","a","man","about",
    "this","and","that","cat","and","he","told","me",
    "the","only","other","person","to","ask","him",
    "about","the","crazy","cat","was","the","cats","owner"};

string searchWord = "cat";

//Add up all words that match the searchWord
Parallel.ForEach(
    //source
    words,  
    //local init
    () => 0, 
    //body
    (string item, ParallelLoopState loopState, int tlsValue) => 
    {
        if (item.ToLower().Equals(searchWord))
        {
            tlsValue++;
        }
        return tlsValue;
    },
    //local finally
    tlsValue =>
    {
        lock (syncLock)
        {
            matches += tlsValue;
        }
    });

Console.WriteLine("Matches for searchword '{0}' : {1}\r\n", searchWord, matches);
Console.WriteLine("Where the original word list was : \r\n\r\n{0}",
    words.Aggregate((x, y) => x.ToString() + " " + y.ToString()));
Console.ReadLine();

The format of the loop is pretty much dictated by TPL if you want to use ThreadLocalStorage, it is just a pattern that you will have to learn and follow. This is what the method signature and comments look like within .NET 4.0 itself:

//
// Summary:
//     Executes a for each operation on an System.Collections.IEnumerable{TSource}
//     in which iterations may run in parallel.
//
// Parameters:
//   source:
//     An enumerable data source.
//
//   localInit:
//     The function delegate that returns the initial state of the local data for
//     each thread.
//
//   body:
//     The delegate that is invoked once per iteration.
//
//   localFinally:
//     The delegate that performs a final action on the local state of each thread.
//
// Type parameters:
//   TSource:
//     The type of the data in the source.
//
//   TLocal:
//     The type of the thread-local data.
//
// Returns:
//     A System.Threading.Tasks.ParallelLoopResult structure that contains information
//     on what portion of the loop completed.
//
// Exceptions:
//   System.ArgumentNullException:
//     The exception that is thrown when the source argument is null.-or-The exception
//     that is thrown when the body argument is null.-or-The exception that is thrown
//     when the localInit argument is null.-or-The exception that is thrown when
//     the localFinally argument is null.
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, 
       Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, 
       TLocal> body, Action<TLocal> localFinally);

It looks a bit hairy, but I think the code is somehow easier to understand than this TPL metadata.

Here is what this code looks like when run:

Demo solution project: ThreadLocalStorage2

This example initializes a source list with random double values, and then adds these input values up to form a single output value. Again, it does this using ThreadLocalStorage, and as such, we are again accessing a shared variable that needs thread synchronization of some sort. Again, I have chosen to use lock(..). The other thing to note is that since the data type I am dealing with in this demo is a double, I must use the generic available on the Parallel.For<T> which is double in this case.

double total = 0;
object syncLock = new object();

double[] values = new double[100];
Random rand = new Random();
Object locker = new Object();
ManualResetEventSlim mre = new ManualResetEventSlim();

//initialise some random values, and signal when done
Task initialiserTask = Task.Factory.StartNew(() =>
    {
        Parallel.For(0, values.Length, (idx) =>
            {
                lock (locker)
                {
                    values[idx] = rand.NextDouble();
                }
            });
        mre.Set();
    });

//wait until all initialised
mre.Wait();

//now use Thread Local Storage and Sum them together
Parallel.For<double>(
    0,
    values.Length,
    //local init
    () => 0,
    //body
    (int index, ParallelLoopState loopState, double tlsValue) =>
    {
        tlsValue += values[index];
        return tlsValue;
    },
    //local finally
    tlsValue =>
    {
        lock (syncLock)
        {
            total += tlsValue;
        }
    });

Console.WriteLine("Total: {0}", total);

// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

And here is a screenshot of it all running:

That's it for Now

That is all I wanted to say in this article. I hope you liked it, and want more. If you did like this article, and would like more, could you spare some time to leave a comment and a vote? Many thanks.

Hopefully, see you at the next one, and the one after that, and the one after that, yes 6 in total. I better get busy.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here