Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / HPC / parallel-processing

Parallel Computing Concepts via C# 4.0

4.72/5 (25 votes)
4 Nov 2010CPOL5 min read 1  
An article that presents the basics of Parallel Computing in .NET 4.0

Introduction

.NET 4.0 has kept in step with the computing industry's quest for density. There has always been a quest to enhance performance while accomplishing more in a shorter time period. Parallel LINQ, the Parallel class, the task parallelism constructs, and the concurrent collections are new to Framework 4.0 and are collectively known as PFX (Parallel Framework). The Parallel class together with the task parallelism constructs is called the Task Parallel Library or TPL. This is a necessary addition to .NET because CPU clock speeds have stagnated and manufacturers have shifted their focus to increasing core counts. This is problematic for us as programmers because our standard single-threaded code will not automatically run faster as a result of those extra cores. That being the case, this article will take a look at parallel programming as done in the C# 4.0 language and the .NET 4.0 runtime. One point should be clarified: a multithreaded application can run code concurrently on a single core. This code executes simultaneously, but in parallel. When the threads execute simultaneously amongst the cores, then we can say that we have achieved concurrency through parallel programming. Further, data parallelism uses the input data to some operation as the means to partition into smaller pieces. Data is divvied up among the available hardware processors in order to achieve parallelism. This partitioning step is normally followed by replicating and executing some mostly independent program across these partitions. It follows that leveraging multiple cores is easy for most server applications, where each thread can independently handle a separate client request, but is harder on the desktop—because it typically requires that you take your computationally intensive code and do the following:

  • Partition it into small chunks
  • Execute those chunks in parallel via multithreading
  • Collate the results as they become available, in a thread-safe manner

The PFX libraries have been designed specifically to help in these scenarios. Programming to leverage multi-cores or multiple processors is called parallel programming. This is a subset of the broader concept of multithreading. There are two strategies for partitioning work among threads: data parallelism (already mentioned, but repeated here) and task parallelism. When a set of tasks must be performed on many data values, we can parallelize by having each thread perform the (same) set of tasks on a subset of values. This is called data parallelism because we are partitioning the data between threads. In contrast, with task parallelism we partition the tasks; in other words, we have each thread perform a different task. In general, data parallelism is easier and scales better to highly parallel hardware, because it reduces or eliminates shared data (thereby reducing contention and thread-safety issues). Also, data parallelism leverages the fact that there are often more data values than discrete tasks, increasing the parallelism potential.

OK. So How Do We Parallelize Loops and use a Parallel Extension?

The algorithm below is a Parallel Extension available from the PFX. The file compiles as a DLL on the command line or builds as a class Visual Studio 2010 to then function as a reference to a program that demonstrates the time differences between sequential and parallel code:

C#
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace System.Threading.Algorithms
{
    public static partial class ParallelAlgorithms
    {
            public static void Wavefront(
            int numRows, int numColumns,
            int numBlocksPerRow, int numBlocksPerColumn,
            Action<int, int, int, int> processBlock)
        {
            // Validate parameters
            if (numRows <= 0) throw new ArgumentOutOfRangeException("numRows");
            if (numColumns <= 0) throw new ArgumentOutOfRangeException("numColumns");
            if (numBlocksPerRow <= 0 || numBlocksPerRow > numRows)
                throw new ArgumentOutOfRangeException("numBlocksPerRow");
            if (numBlocksPerColumn <= 0 || numBlocksPerColumn > numColumns)
                throw new ArgumentOutOfRangeException("numBlocksPerColumn");
            if (processBlock == null)
                throw new ArgumentNullException("processRowColumnCell");

            // Compute the size of each block
            int rowBlockSize = numRows / numBlocksPerRow;
            int columnBlockSize = numColumns / numBlocksPerColumn;

            Wavefront(numBlocksPerRow, numBlocksPerColumn, (row, column) =>
            {
                int start_i = row * rowBlockSize;
                int end_i = row < numBlocksPerRow - 1 ?
                    start_i + rowBlockSize : numRows;

                int start_j = column * columnBlockSize;
                int end_j = column < numBlocksPerColumn - 1 ?
                    start_j + columnBlockSize : numColumns;

                processBlock(start_i, end_i, start_j, end_j);
            });
        }
public static void Wavefront(int numRows, int numColumns, Action<int, int> 
 processRowColumnCell)
        {
            // Validate parameters
            if (numRows <= 0) throw new ArgumentOutOfRangeException("numRows");
            if (numColumns <= 0) throw new ArgumentOutOfRangeException("numColumns");
            if (processRowColumnCell == null) throw 
            new ArgumentNullException("processRowColumnCell");

            Task[] prevTaskRow = new Task[numColumns];
            Task prevTaskInCurrentRow = null;
            var dependencies = new Task[2];

            // Create a task for each cell
            for (int row = 0; row < numRows; row++)
            {
                prevTaskInCurrentRow = null;
                for (int column = 0; column < numColumns; column++)
                {
                    // In-scope locals for being captured in the task closures
                    int j = row, i = column;

                    // Create a task with the appropriate dependencies.
                    Task curTask;
                    if (row == 0 && column == 0)
                    {
                        // Upper-left task kicks everything off, having no dependencies
                        curTask = Task.Factory.StartNew(() => processRowColumnCell(j, i));
                    }
                    else if (row == 0 || column == 0)
                    {
                       var antecedent = column == 0 
                             ? prevTaskRow[0] : prevTaskInCurrentRow;
                        curTask = antecedent.ContinueWith(p =>
                        {
                            p.Wait(); // Necessary only to propagate exceptions
                            processRowColumnCell(j, i);
                        });
                    }
                    else // row > 0 && column > 0
                    {
                        // All other tasks depend on both the tasks above and to the left
                        dependencies[0] = prevTaskInCurrentRow;
                        dependencies[1] = prevTaskRow[column];
                        curTask = Task.Factory.ContinueWhenAll(dependencies, ps =>
                        {
                            Task.WaitAll(ps); // Necessary only to propagate exceptions
                            processRowColumnCell(j, i);
                        });
                    }

                    // Keep track of the task just created for future iterations
                    prevTaskRow[column] = prevTaskInCurrentRow = curTask;
                }
            }

            // Wait for the last task to be done.
            prevTaskInCurrentRow.Wait();
        }
    }
}

To compile:

csc /target:library /out:Algorithms_Wavefront.dll Algorithms_Wavefront.cs 

We will use this DLL as an added reference in Visual Studio 2010 or by using the ‘/reference:’ switch on the command line. Notice the Action<t> functional delegate used as a functional type that takes an action based upon its input. Now let’s take a look at this next program that will reference the algorithm class externally.

C#
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Linq.Parallel;
using System.Threading.Tasks;
using Microsoft.CSharp;
class Program
    {
       static void Main(string[] args)
        {
            Random rand = new Random();
            Stopwatch sw = new Stopwatch();
            int result;
            while (true)
            {
                string s1 = GenerateRandomString(rand);
                string s2 = GenerateRandomString(rand);

                sw.Restart();
                result = SerialTimeLength(s1, s2);
                sw.Stop();
                Console.WriteLine("Serial  :\t{0}\t{1}", result, sw.Elapsed);

                sw.Restart();
                result = ParallelTimeLength(s1, s2);
                sw.Stop();
                Console.WriteLine("Parallel:\t{0}\t{1}", result, sw.Elapsed);

                Console.WriteLine("----------------------------------------------------");
                GC.Collect();
            }
        }

        private static string GenerateRandomString(Random rand)
        {
            const int LEN = 10000;
            StringBuilder sb = new StringBuilder(LEN);
            for (int i = 0; i < LEN; i++) sb.Append((char)('a' + rand.Next(0, 26)));
            return sb.ToString();
        }

        private static int SerialTimeLength(string s1, string s2)
        {
            int[,] dist = new int[s1.Length + 1, s2.Length + 1];
            for (int i = 0; i <= s1.Length; i++) dist[i, 0] = i;
            for (int j = 0; j <= s2.Length; j++) dist[0, j] = j;

            for (int i = 1; i <= s1.Length; i++)
            {
                for (int j = 1; j <= s2.Length; j++)
                {
                    dist[i, j] = (s1[i - 1] == s2[j - 1]) ?
                        dist[i - 1, j - 1] :
                        1 + Math.Min(dist[i - 1, j],
                            Math.Min(dist[i, j - 1],
                                     dist[i - 1, j - 1]));
                }
            }
            return dist[s1.Length, s2.Length];
        }

        private static int ParallelTimeLength(string s1, string s2)
        {
            int[,] dist = new int[s1.Length + 1, s2.Length + 1];
            for (int i = 0; i <= s1.Length; i++) dist[i, 0] = i;
            for (int j = 0; j <= s2.Length; j++) dist[0, j] = j;
            int numBlocks = Environment.ProcessorCount * 4;

    System.Threading.Algorithms.ParallelAlgorithms.Wavefront(s1.Length,    s2.Length, 
    numBlocks, numBlocks, (start_i, end_i, start_j, end_j) =>
            {
                for (int i = start_i+1; i <= end_i; i++)
                {
                    for (int j = start_j+1; j <= end_j; j++)
                    {
                        dist[i, j] = (s1[i - 1] == s2[j - 1]) ?
                            dist[i - 1, j - 1] :
                            1 + Math.Min(dist[i - 1, j],
                                Math.Min(dist[i, j - 1],
                                         dist[i - 1, j - 1]));
                    }
                }
            });

            return dist[s1.Length, s2.Length];
        }
    }
csc /t:ParallelAlgorithms_Wavefront.dll Program.cs 

Notice the time differences between sequentially executed code and code that parallelizes its loops:

Serial  :       8811    00:00:06.3117762
Parallel:       8811    00:00:03.6879674
-------------------------------------------------------
Serial  :       8791    00:00:06.3353283
Parallel:       8791    00:00:03.6033089
-------------------------------------------------------
Serial  :       8813    00:00:06.1330880
Parallel:       8813    00:00:03.5224918
-------------------------------------------------------
Serial  :       8787    00:00:06.2184788
Parallel:       8787    00:00:03.7236997
-------------------------------------------------------
Serial  :       8810    00:00:06.1260435
Parallel:       8810    00:00:03.5696177
-------------------------------------------------------
Serial  :       8800    00:00:06.1468799
Parallel:       8800    00:00:03.5442969
-------------------------------------------------------
Serial  :       8807    00:00:06.3541186
Parallel:       8807    00:00:03.5775287
-------------------------------------------------------
Serial  :       8819    00:00:06.2730393
Parallel:       8819    00:00:03.6345988
-------------------------------------------------------
Serial  :       8801    00:00:06.2168016
Parallel:       8801    00:00:03.6277170
-------------------------------------------------------
Serial  :       8762    00:00:06.1840874
Parallel:       8762    00:00:03.5964011
-------------------------------------------------------
Serial  :       8810    00:00:06.1708841
Parallel:       8810    00:00:03.8727436
-------------------------------------------------------
Serial  :       8799    00:00:06.1792801
Parallel:       8799    00:00:03.6226900
-------------------------------------------------------
Serial  :       8822    00:00:06.1833951
Parallel:       8822    00:00:03.7146820
-------------------------------------------------------
Serial  :       8794    00:00:06.1674580
Parallel:       8794    00:00:03.6571650
-------------------------------------------------------
Serial  :       8803    00:00:06.2745721
Parallel:       8803    00:00:04.0783518
-------------------------------------------------------
Serial  :       8787    00:00:06.5583867
Parallel:       8787    00:00:04.9902464
-------------------------------------------------------
Serial  :       8803    00:00:06.5382290
Parallel:       8803    00:00:04.3523305
-------------------------------------------------------
Serial  :       8792    00:00:06.2359404
Parallel:       8792    00:00:03.6123527
-------------------------------------------------------
Serial  :       8810    00:00:06.2226213
Parallel:       8810    00:00:03.6718996
-------------------------------------------------------
Serial  :       8816    00:00:06.1712133
Parallel:       8816    00:00:03.6129905
and so on …

What’s an Example of a Computation?

The number PI is called a transcendental number. This means that it has an infinite repeating decimal that shows no pattern. Mathematicians have calculated PI to millions of places and have found no pattern.

Untitled.jpg

The code shown below is an implementation of the numerical integration midpoint rectangle rule to solve the integral just shown. To compute an approximation of the area under the curve, we must compute the area of some number of rectangles (num_rects) by finding the midpoint (mid) of each rectangle and computing the height of that rectangle (height), which is simply the function value at that midpoint. We add together the heights of all the rectangles (sum) and, once computed, we multiply the sum of the heights by the width of the rectangles (width) to determine the desired approximation of the total area (area) and the value of pi:

C++
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
  static long num_rects=100000;
   int main(int argc, char *argv[])
   {
     int i;
    double mid, height, width, sum = 0.0;
    double area;
    width = 1.0/(double) num_rects;
      for (i = 0; i < num_rects; i++){
      mid = (i + 0.5) * width;
    height = 4.0/(1.0 + mid*mid);
    sum += height;
    }
   area = width * sum;
   printf("Computed pi = %f\n",area);
   }
Computed pi = 3.141593

So if we were take this code and calculate PI in managed C#, both in Serial and in Parallel, would the time spent performing computations near each other? Look again at the basic C code. I have not threaded this code. The two work variables, mid and height, are assigned values in each iteration. If each thread had a local copy, that copy could be used during execution of iterations that are assigned to a thread. Also, the iteration variable, i, is updated for each iteration. Each thread will need a local copy in order to avoid interfering with the execution of iterations within other threads. The sum variable is updated in each iteration, but since this value is used outside of the loop, we can’t have each thread work with a local copy that would be discarded after the thread was done.

Here is a file that computes PI five times. The time of the computation of each of the five starts with the longest time and reduces to the shortest time. The first computation uses a method called SerialLinqPI(). We start with that method to form a benchmark on how the computation can be performed quicker and better:

C#
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

class Program
{
    const int num_steps = 100000000;
    
    static void Main(string[] args)
    {
        while (true)
        {
            Time(() => SerialLinqPi());
            Time(() => ParallelLinqPi());
            Time(() => SerialPi());
            Time(() => ParallelPi());
            Time(() => ParallelPartitionerPi());

            Console.WriteLine("----");
            Console.ReadLine();
        }
    }
    
    static void Time<t>(Func<t> work)
    {
        var sw = Stopwatch.StartNew();
        var result = work();
        Console.WriteLine(sw.Elapsed + ": " + result);
    }
    
    static double SerialLinqPi()
    {
        double step = 1.0 / (double)num_steps;
        return (from i in Enumerable.Range(0, num_steps)
                let x = (i + 0.5) * step
                select 4.0 / (1.0 + x * x)).Sum() * step;
    }
    
    static double ParallelLinqPi()
    {
        double step = 1.0 / (double)num_steps;
        return (from i in ParallelEnumerable.Range(0, num_steps)
                let x = (i + 0.5) * step
                select 4.0 / (1.0 + x * x)).Sum() * step;
    }
    
    static double SerialPi()
    {
        double sum = 0.0;
        double step = 1.0 / (double)num_steps;
        for (int i = 0; i < num_steps; i++)
        {
            double x = (i + 0.5) * step;
            sum = sum + 4.0 / (1.0 + x * x);
        }
        return step * sum;
    }
    
    static double ParallelPi()
    {
        double sum = 0.0;
        double step = 1.0 / (double)num_steps;
        object monitor = new object();
        Parallel.For(0, num_steps, () => 0.0, (i, state, local) =>
        {
            double x = (i + 0.5) * step;
            return local + 4.0 / (1.0 + x * x);
        }, local => { lock (monitor) sum += local; });
        return step * sum;
    }

    static double ParallelPartitionerPi()
    {
        double sum = 0.0;
        double step = 1.0 / (double)num_steps;
        object monitor = new object();
        Parallel.ForEach(Partitioner.Create(0, num_steps), () => 0.0, 
		(range, state, local) =>
        {
            for (int i = range.Item1; i < range.Item2; i++)
            {
                double x = (i + 0.5) * step;
                local += 4.0 / (1.0 + x * x);
            }
            return local;
        }, local => { lock (monitor) sum += local; });
        return step * sum;
    }
}

As you would probably expect at this point, the times of computation reduce based on how we divvy the data and parallelize the loops:

00:00:05.6078739: 3.14159265359043
00:00:04.0519734: 3.14159265358991
00:00:01.4407659: 3.14159265359043
00:00:01.2224946: 3.14159265358996
00:00:00.8162013: 3.14159265358965

One might assume that the simplest form of static decomposition would be to divide the size of the loop by the number of processors to get a per thread iteration count and to have each thread process a series of contiguous iterations. This is not actually the strongest route to take because it can lead to an inefficient use of the processors. The different types of decomposition approaches should be examined carefully in order to capitalize from data parallelism.

References

  • Patterns of Parallel Programming, by Microsoft’s Stephen Toub

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)