Introduction
.NET 4.0 has kept in step with the computing industry's quest for density. There has always been a quest to enhance performance while accomplishing more in a shorter time period. Parallel LINQ, the Parallel class, the task parallelism constructs, and the concurrent collections are new to Framework 4.0 and are collectively known as PFX (Parallel Framework). The Parallel class together with the task parallelism constructs is called the Task Parallel Library or TPL. This is a necessary addition to .NET because CPU clock speeds have stagnated and manufacturers have shifted their focus to increasing core counts. This is problematic for us as programmers because our standard single-threaded code will not automatically run faster as a result of those extra cores. That being the case, this article will take a look at parallel programming as done in the C# 4.0 language and the .NET 4.0 runtime. One point should be clarified: a multithreaded application can run code concurrently on a single core. This code executes simultaneously, but in parallel. When the threads execute simultaneously amongst the cores, then we can say that we have achieved concurrency through parallel programming. Further, data parallelism uses the input data to some operation as the means to partition into smaller pieces. Data is divvied up among the available hardware processors in order to achieve parallelism. This partitioning step is normally followed by replicating and executing some mostly independent program across these partitions. It follows that leveraging multiple cores is easy for most server applications, where each thread can independently handle a separate client request, but is harder on the desktop—because it typically requires that you take your computationally intensive code and do the following:
- Partition it into small chunks
- Execute those chunks in parallel via multithreading
- Collate the results as they become available, in a thread-safe manner
The PFX libraries have been designed specifically to help in these scenarios. Programming to leverage multi-cores or multiple processors is called parallel programming. This is a subset of the broader concept of multithreading. There are two strategies for partitioning work among threads: data parallelism (already mentioned, but repeated here) and task parallelism. When a set of tasks must be performed on many data values, we can parallelize by having each thread perform the (same) set of tasks on a subset of values. This is called data parallelism because we are partitioning the data between threads. In contrast, with task parallelism we partition the tasks; in other words, we have each thread perform a different task. In general, data parallelism is easier and scales better to highly parallel hardware, because it reduces or eliminates shared data (thereby reducing contention and thread-safety issues). Also, data parallelism leverages the fact that there are often more data values than discrete tasks, increasing the parallelism potential.
OK. So How Do We Parallelize Loops and use a Parallel Extension?
The algorithm below is a Parallel Extension available from the PFX. The file compiles as a DLL on the command line or builds as a class Visual Studio 2010 to then function as a reference to a program that demonstrates the time differences between sequential and parallel code:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace System.Threading.Algorithms
{
public static partial class ParallelAlgorithms
{
public static void Wavefront(
int numRows, int numColumns,
int numBlocksPerRow, int numBlocksPerColumn,
Action<int, int, int, int> processBlock)
{
if (numRows <= 0) throw new ArgumentOutOfRangeException("numRows");
if (numColumns <= 0) throw new ArgumentOutOfRangeException("numColumns");
if (numBlocksPerRow <= 0 || numBlocksPerRow > numRows)
throw new ArgumentOutOfRangeException("numBlocksPerRow");
if (numBlocksPerColumn <= 0 || numBlocksPerColumn > numColumns)
throw new ArgumentOutOfRangeException("numBlocksPerColumn");
if (processBlock == null)
throw new ArgumentNullException("processRowColumnCell");
int rowBlockSize = numRows / numBlocksPerRow;
int columnBlockSize = numColumns / numBlocksPerColumn;
Wavefront(numBlocksPerRow, numBlocksPerColumn, (row, column) =>
{
int start_i = row * rowBlockSize;
int end_i = row < numBlocksPerRow - 1 ?
start_i + rowBlockSize : numRows;
int start_j = column * columnBlockSize;
int end_j = column < numBlocksPerColumn - 1 ?
start_j + columnBlockSize : numColumns;
processBlock(start_i, end_i, start_j, end_j);
});
}
public static void Wavefront(int numRows, int numColumns, Action<int, int>
processRowColumnCell)
{
if (numRows <= 0) throw new ArgumentOutOfRangeException("numRows");
if (numColumns <= 0) throw new ArgumentOutOfRangeException("numColumns");
if (processRowColumnCell == null) throw
new ArgumentNullException("processRowColumnCell");
Task[] prevTaskRow = new Task[numColumns];
Task prevTaskInCurrentRow = null;
var dependencies = new Task[2];
for (int row = 0; row < numRows; row++)
{
prevTaskInCurrentRow = null;
for (int column = 0; column < numColumns; column++)
{
int j = row, i = column;
Task curTask;
if (row == 0 && column == 0)
{
curTask = Task.Factory.StartNew(() => processRowColumnCell(j, i));
}
else if (row == 0 || column == 0)
{
var antecedent = column == 0
? prevTaskRow[0] : prevTaskInCurrentRow;
curTask = antecedent.ContinueWith(p =>
{
p.Wait();
processRowColumnCell(j, i);
});
}
else
{
dependencies[0] = prevTaskInCurrentRow;
dependencies[1] = prevTaskRow[column];
curTask = Task.Factory.ContinueWhenAll(dependencies, ps =>
{
Task.WaitAll(ps);
processRowColumnCell(j, i);
});
}
prevTaskRow[column] = prevTaskInCurrentRow = curTask;
}
}
prevTaskInCurrentRow.Wait();
}
}
}
To compile:
csc /target:library /out:Algorithms_Wavefront.dll Algorithms_Wavefront.cs
We will use this DLL as an added reference in Visual Studio 2010 or by using the ‘/reference:’ switch on the command line. Notice the Action<t> functional delegate used as a functional type that takes an action based upon its input. Now let’s take a look at this next program that will reference the algorithm class externally.
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Linq.Parallel;
using System.Threading.Tasks;
using Microsoft.CSharp;
class Program
{
static void Main(string[] args)
{
Random rand = new Random();
Stopwatch sw = new Stopwatch();
int result;
while (true)
{
string s1 = GenerateRandomString(rand);
string s2 = GenerateRandomString(rand);
sw.Restart();
result = SerialTimeLength(s1, s2);
sw.Stop();
Console.WriteLine("Serial :\t{0}\t{1}", result, sw.Elapsed);
sw.Restart();
result = ParallelTimeLength(s1, s2);
sw.Stop();
Console.WriteLine("Parallel:\t{0}\t{1}", result, sw.Elapsed);
Console.WriteLine("----------------------------------------------------");
GC.Collect();
}
}
private static string GenerateRandomString(Random rand)
{
const int LEN = 10000;
StringBuilder sb = new StringBuilder(LEN);
for (int i = 0; i < LEN; i++) sb.Append((char)('a' + rand.Next(0, 26)));
return sb.ToString();
}
private static int SerialTimeLength(string s1, string s2)
{
int[,] dist = new int[s1.Length + 1, s2.Length + 1];
for (int i = 0; i <= s1.Length; i++) dist[i, 0] = i;
for (int j = 0; j <= s2.Length; j++) dist[0, j] = j;
for (int i = 1; i <= s1.Length; i++)
{
for (int j = 1; j <= s2.Length; j++)
{
dist[i, j] = (s1[i - 1] == s2[j - 1]) ?
dist[i - 1, j - 1] :
1 + Math.Min(dist[i - 1, j],
Math.Min(dist[i, j - 1],
dist[i - 1, j - 1]));
}
}
return dist[s1.Length, s2.Length];
}
private static int ParallelTimeLength(string s1, string s2)
{
int[,] dist = new int[s1.Length + 1, s2.Length + 1];
for (int i = 0; i <= s1.Length; i++) dist[i, 0] = i;
for (int j = 0; j <= s2.Length; j++) dist[0, j] = j;
int numBlocks = Environment.ProcessorCount * 4;
System.Threading.Algorithms.ParallelAlgorithms.Wavefront(s1.Length, s2.Length,
numBlocks, numBlocks, (start_i, end_i, start_j, end_j) =>
{
for (int i = start_i+1; i <= end_i; i++)
{
for (int j = start_j+1; j <= end_j; j++)
{
dist[i, j] = (s1[i - 1] == s2[j - 1]) ?
dist[i - 1, j - 1] :
1 + Math.Min(dist[i - 1, j],
Math.Min(dist[i, j - 1],
dist[i - 1, j - 1]));
}
}
});
return dist[s1.Length, s2.Length];
}
}
csc /t:ParallelAlgorithms_Wavefront.dll Program.cs
Notice the time differences between sequentially executed code and code that parallelizes its loops:
Serial : 8811 00:00:06.3117762
Parallel: 8811 00:00:03.6879674
-------------------------------------------------------
Serial : 8791 00:00:06.3353283
Parallel: 8791 00:00:03.6033089
-------------------------------------------------------
Serial : 8813 00:00:06.1330880
Parallel: 8813 00:00:03.5224918
-------------------------------------------------------
Serial : 8787 00:00:06.2184788
Parallel: 8787 00:00:03.7236997
-------------------------------------------------------
Serial : 8810 00:00:06.1260435
Parallel: 8810 00:00:03.5696177
-------------------------------------------------------
Serial : 8800 00:00:06.1468799
Parallel: 8800 00:00:03.5442969
-------------------------------------------------------
Serial : 8807 00:00:06.3541186
Parallel: 8807 00:00:03.5775287
-------------------------------------------------------
Serial : 8819 00:00:06.2730393
Parallel: 8819 00:00:03.6345988
-------------------------------------------------------
Serial : 8801 00:00:06.2168016
Parallel: 8801 00:00:03.6277170
-------------------------------------------------------
Serial : 8762 00:00:06.1840874
Parallel: 8762 00:00:03.5964011
-------------------------------------------------------
Serial : 8810 00:00:06.1708841
Parallel: 8810 00:00:03.8727436
-------------------------------------------------------
Serial : 8799 00:00:06.1792801
Parallel: 8799 00:00:03.6226900
-------------------------------------------------------
Serial : 8822 00:00:06.1833951
Parallel: 8822 00:00:03.7146820
-------------------------------------------------------
Serial : 8794 00:00:06.1674580
Parallel: 8794 00:00:03.6571650
-------------------------------------------------------
Serial : 8803 00:00:06.2745721
Parallel: 8803 00:00:04.0783518
-------------------------------------------------------
Serial : 8787 00:00:06.5583867
Parallel: 8787 00:00:04.9902464
-------------------------------------------------------
Serial : 8803 00:00:06.5382290
Parallel: 8803 00:00:04.3523305
-------------------------------------------------------
Serial : 8792 00:00:06.2359404
Parallel: 8792 00:00:03.6123527
-------------------------------------------------------
Serial : 8810 00:00:06.2226213
Parallel: 8810 00:00:03.6718996
-------------------------------------------------------
Serial : 8816 00:00:06.1712133
Parallel: 8816 00:00:03.6129905
and so on …
What’s an Example of a Computation?
The number PI is called a transcendental number. This means that it has an infinite repeating decimal that shows no pattern. Mathematicians have calculated PI to millions of places and have found no pattern.
The code shown below is an implementation of the numerical integration midpoint rectangle rule to solve the integral just shown. To compute an approximation of the area under the curve, we must compute the area of some number of rectangles (num_rects
) by finding the midpoint (mid) of each rectangle and computing the height of that rectangle (height), which is simply the function value at that midpoint. We add together the heights of all the rectangles (sum) and, once computed, we multiply the sum of the heights by the width of the rectangles (width) to determine the desired approximation of the total area (area) and the value of pi:
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
static long num_rects=100000;
int main(int argc, char *argv[])
{
int i;
double mid, height, width, sum = 0.0;
double area;
width = 1.0/(double) num_rects;
for (i = 0; i < num_rects; i++){
mid = (i + 0.5) * width;
height = 4.0/(1.0 + mid*mid);
sum += height;
}
area = width * sum;
printf("Computed pi = %f\n",area);
}
Computed pi = 3.141593
So if we were take this code and calculate PI in managed C#, both in Serial and in Parallel, would the time spent performing computations near each other? Look again at the basic C code. I have not threaded this code. The two work variables, mid and height, are assigned values in each iteration. If each thread had a local copy, that copy could be used during execution of iterations that are assigned to a thread. Also, the iteration variable, i
, is updated for each iteration. Each thread will need a local copy in order to avoid interfering with the execution of iterations within other threads. The sum variable is updated in each iteration, but since this value is used outside of the loop, we can’t have each thread work with a local copy that would be discarded after the thread was done.
Here is a file that computes PI five times. The time of the computation of each of the five starts with the longest time and reduces to the shortest time. The first computation uses a method called SerialLinqPI()
. We start with that method to form a benchmark on how the computation can be performed quicker and better:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
class Program
{
const int num_steps = 100000000;
static void Main(string[] args)
{
while (true)
{
Time(() => SerialLinqPi());
Time(() => ParallelLinqPi());
Time(() => SerialPi());
Time(() => ParallelPi());
Time(() => ParallelPartitionerPi());
Console.WriteLine("----");
Console.ReadLine();
}
}
static void Time<t>(Func<t> work)
{
var sw = Stopwatch.StartNew();
var result = work();
Console.WriteLine(sw.Elapsed + ": " + result);
}
static double SerialLinqPi()
{
double step = 1.0 / (double)num_steps;
return (from i in Enumerable.Range(0, num_steps)
let x = (i + 0.5) * step
select 4.0 / (1.0 + x * x)).Sum() * step;
}
static double ParallelLinqPi()
{
double step = 1.0 / (double)num_steps;
return (from i in ParallelEnumerable.Range(0, num_steps)
let x = (i + 0.5) * step
select 4.0 / (1.0 + x * x)).Sum() * step;
}
static double SerialPi()
{
double sum = 0.0;
double step = 1.0 / (double)num_steps;
for (int i = 0; i < num_steps; i++)
{
double x = (i + 0.5) * step;
sum = sum + 4.0 / (1.0 + x * x);
}
return step * sum;
}
static double ParallelPi()
{
double sum = 0.0;
double step = 1.0 / (double)num_steps;
object monitor = new object();
Parallel.For(0, num_steps, () => 0.0, (i, state, local) =>
{
double x = (i + 0.5) * step;
return local + 4.0 / (1.0 + x * x);
}, local => { lock (monitor) sum += local; });
return step * sum;
}
static double ParallelPartitionerPi()
{
double sum = 0.0;
double step = 1.0 / (double)num_steps;
object monitor = new object();
Parallel.ForEach(Partitioner.Create(0, num_steps), () => 0.0,
(range, state, local) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
double x = (i + 0.5) * step;
local += 4.0 / (1.0 + x * x);
}
return local;
}, local => { lock (monitor) sum += local; });
return step * sum;
}
}
As you would probably expect at this point, the times of computation reduce based on how we divvy the data and parallelize the loops:
00:00:05.6078739: 3.14159265359043
00:00:04.0519734: 3.14159265358991
00:00:01.4407659: 3.14159265359043
00:00:01.2224946: 3.14159265358996
00:00:00.8162013: 3.14159265358965
One might assume that the simplest form of static decomposition would be to divide the size of the loop by the number of processors to get a per thread iteration count and to have each thread process a series of contiguous iterations. This is not actually the strongest route to take because it can lead to an inefficient use of the processors. The different types of decomposition approaches should be examined carefully in order to capitalize from data parallelism.
References
- Patterns of Parallel Programming, by Microsoft’s Stephen Toub