Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / DevOps

Easy to Use Parallel foreach, that can be Magnitudes Faster than the .NET Version

5.00/5 (19 votes)
10 Jul 2018CPOL1 min read 33.4K  
Parallel foreach loop implementation for nested loops

Introduction

I'd like to share a quick solution for a parallel foreach loop, which can potentially increase the performance of certain applications currently using the built in version introduced in .NET 4.0. This custom version is suited to be used in nested loops, where the outer one needs to be processed sequentially. The number of threads has to be set manually, this could provide additional control over the threads.

Background

Recently, I've run into a problem, where I had a collection of items which could be independently processed, but I also had an outer loop in which I had to sequentially post-process the said collection. Repeatedly creating the inner Parallel.Foreach resulted in such an overhead, that the performance was much worse than using a non parallel loop. As a solution, I've created a class, that makes it possible to instantiate many of the necessary objects outside of the loops, thus lightening the administrative overhead.

This console application is a demo to demonstrate the potential performance increase.

I used a prime number finder algorithm (FindPrimeNumber(int n)) as a calculation heavy function.

C#
static void Main(string[] args)
{
    int numberOfThreads = 7;
    int numberOfIterations = 10000;
    int startOfRange = 80;

    List<int> numbers = new List<int>();
    numbers = Enumerable.Range(startOfRange, 7).ToList();

    Console.WriteLine($"Parallel processing has started...");

    var stopwatch = Stopwatch.StartNew();
    stopwatch.Start();

    ParallelProcessor<int> pp = new ParallelProcessor<int>(numberOfThreads, FindPrimeNumber);

    for (int j = 0; j < numberOfIterations; j++)
    {
        pp.ForEach(numbers);
    }

    stopwatch.Stop();
    Console.WriteLine($"Job's (custom) done over {stopwatch.ElapsedMilliseconds} ms");

    Console.WriteLine($"Process started: Built-in ForEach");

    stopwatch.Reset();
    stopwatch.Start();

    for (int j = 0; j < numberOfIterations; j++)
    {
        Parallel.ForEach(numbers, (currentNumber) =>
        {
            FindPrimeNumber(currentNumber);
        });
    }

    Console.WriteLine($"Job's (built-in) done over {stopwatch.ElapsedMilliseconds} ms");
    Console.WriteLine($"Process started: sequential");
    stopwatch.Reset();
    stopwatch.Start();

    for (int j = 0; j < numberOfIterations; j++)
    {
        for (int i = 0; i < numbers.Count; i++)
        {
            FindPrimeNumber(numbers[i]);
        }
    }

    Console.WriteLine($"Job's (sequential) done over {stopwatch.ElapsedMilliseconds} ms");
    Console.ReadKey();
}

static void FindPrimeNumber(int n)
{
    int count = 0;
    long a = 2;
    while (count < n)
    {
        long b = 2;
        int prime = 1;
        while (b * b <= a)
        {
            if (a % b == 0)
            {
                prime = 0;
                break;
            }
            b++;
        }
        if (prime > 0)
        {
            count++;
        }
        a++;
    }
}

This demo application produced the following output:

Parallel processing has started...
Job's (custom) done over 994 ms
Process started: Built-in ForEach
Job's (built-in) done over 3159 ms
Process started: sequential
Job's (sequential) done over 2262 ms

Using the Code

The class itself is very simple:

C#
public class ParallelProcessor<T>
{
    SlicedList<T>[] listSlices;
    int numberOfThreads;
    Action<T> action;
    ManualResetEvent[] manualResetEvents;

    public ParallelProcessor(int NumberOfThreads, Action<T> Action)
    {
        this.numberOfThreads = NumberOfThreads;
        this.listSlices = new SlicedList<T>[numberOfThreads];
        this.action = Action;
        this.manualResetEvents = new ManualResetEvent[numberOfThreads];

        for (int i = 0; i < numberOfThreads; i++)
        {
            listSlices[i] = new SlicedList<T>();
            manualResetEvents[i] = new ManualResetEvent(false);
            listSlices[i].indexes = new LinkedList<int>();
            listSlices[i].manualResetEvent = manualResetEvents[i];
        }
    }

    public void ForEach(List<T> Items)
    {
        prepareListSlices(Items);
        for (int i = 0; i < numberOfThreads; i++)
        {
            manualResetEvents[i].Reset();
            ThreadPool.QueueUserWorkItem(new WaitCallback(
                DoWork), listSlices[i]);
        }
        WaitHandle.WaitAll(manualResetEvents);
    }

    private void prepareListSlices(List<T> items)
    {
        for (int i = 0; i < numberOfThreads; i++)
        {
            listSlices[i].items = items;
            listSlices[i].indexes.Clear();
        }
        for (int i = 0; i < items.Count; i++)
        {
            listSlices[i % numberOfThreads].indexes.AddLast(i);
        }
    }

    private void DoWork(object o)
    {
        SlicedList<T> slicedList = (SlicedList<T>)o;

        foreach (int i in slicedList.indexes)
        {
            action(slicedList.items[i]);
        }
        slicedList.manualResetEvent.Set();
    }
}

public class SlicedList<T>
{
    public List<T> items;
    public LinkedList<int> indexes;
    public ManualResetEvent manualResetEvent;
}

To use the function, you need to create an instance of the class, and call the ForEach() method with the collection as a parameter, as you can see in the demo app as well:

C++
ParallelProcessor<int> pp = new ParallelProcessor<int>(numberOfThreads, FindPrimeNumber);
pp.ForEach(numbers);

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)