Introduction
Altough BackgroundWorker
makes developing multi-threaded Desktop application quite easy, a developer should practically write their own code to fulfill the same task while developing an ASP.NET application.
This process includes dividing the data input into smaller parts,creating the threads, assigning jobs to threads, and collecting results from previously created threads.
To provide an easy and quick way of converting time-consuming operations, I have created a small library to enable using your own methods in a multi-threaded way. The whole idea is based on writing a method to perform a task and passing it to the library to run it multi-threaded instead of single-threaded.
Background
The project is mainly based on the combination of my two previous projects. One of them was to create a template engine such as DotLiquid to provide customizable reporting outputs. And the other one was basically speeding the wage calculation of 3600+ employees using a dynamic formulation based on Gold Parser.
I used the multi-threaded wage calculation engine and generalized it with the code from the template engine.
Using the code
Let's start with a code which is designed to run single-threaded.
private IList<Employee> m_employees
{
get
{
return Session["Employees"] as IList<Employee>;
}
set
{
Session["Employees"] = value;
}
}
protected void Page_Load(object sender, EventArgs e)
{
if (!IsPostBack)
{
IList<Employee> eList = DAL.SelectHQL<Employee>("select p from Personel p", 5000);
m_employees = eList;
}
}
protected void btRunSingleThreaded_Click(object sender, EventArgs e)
{
try
{
object[] results = new object[m_employees.Count];
int counter = 0;
foreach (Employee emp in m_employees)
{
results[counter] = GetLastPosition(emp);
counter++;
}
}
catch (Exception ex)
{
throw ex;
}
}
private Position GetLastPosition(Employee emp)
{
try
{
string hql = @"
from
Position as pos
inner join fetch pos.Sort as srt
where
pos.Employee.ID = ?
order by
srt.No asc";
Position lastPosition = DAL.SelectHQL<Position>(hql, 0, emp.ID).LastOrDefault();
return lastPosition;
}
catch (Exception ex)
{
throw ex;
}
}
where DAL
stands for the Data Access Layer
, a class of our company Framework.
To convert the previous GetLastPosition()
method to work in a multi-threaded fashion we just use the QuickParallelization
class.
protected void btRunMultiThreaded_Click(object sender, EventArgs e)
{
try
{
ParallelOperationsLibrary lib = new ParallelOperationsLibrary();
object[] lastPositons = lib.RunParallel(m_employees, 50, new Func<Employee, Position>(this.GetLastPosition));
}
catch (Exception ex)
{
throw ex;
}
}
To better understand how the whole process works let's examine the library itself.
public class ParallelOperationsLibrary
{
private static ManualResetEvent[] ThreadEvents; private Delegate ProcMethod; private object[] ProcArgs; private List<object> ProcList; object[] resultSet;
public ParallelOperationsLibrary()
{}
}
Main entrance point is the RunParallel
method itself which seperates the whole input into smaller blocks to process, create threads, runs threads and collects results. Also maximum number of threads created are limited to 64 as this is the maximum number allowed and exceeding this value will end up throwing an Exception.
public object[] RunParallel(
object list,
int threadCount,
Delegate method,
params object[] args)
{
try
{
if(threadCount > 64)
{
threadCount = 64;
}
List<object> oList = new List<object>();
if (list is IEnumerable)
{
var oq = ((IEnumerable)list).Cast<object>().ToList();
oList = oq.ToList();
}
resultSet = new object[oList.Count];
ProcMethod = method;
ProcArgs = args;
ProcList = oList;
ThreadEvents = new ManualResetEvent[threadCount];
int startIndex, endIndex = 0;
int partCount = (int)Math.Ceiling((decimal)oList.Count / threadCount);
for (int i = 0; i < threadCount; i++)
{
startIndex = partCount * i;
endIndex += partCount;
if (endIndex > oList.Count)
{
endIndex = oList.Count;
}
ThreadEvents[i] = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(new WaitCallback(this.CreateThreads),
new object[] { i, startIndex, endIndex });
}
WaitHandle.WaitAll(ThreadEvents);
return resultSet;
}
catch (Exception ex)
{
throw new Exception("An error occured while running " +
"the parallel processing library.", ex);
}
}
After the data input set is divided into smaller parts and threads are created, we simply process each chunk of data and invoke the original method that does the real processing with DynamicInvoke()
method. This process takes place within the CreateThreads()
method.
private void CreateThreads(object state)
{
try
{
object[] _state = state as object[];
int index = (int)_state[0];
int start = (int)_state[1];
int end = (int)_state[2];
for (int i = start; i < end; i++)
{
try
{
object[] newArgs = new object[ProcArgs.Length + 1];
newArgs[0] = (object)ProcList[i];
for (int j = 0; j < ProcArgs.Length; j++)
{
newArgs[j + 1] = ProcArgs[j];
}
resultSet[i] = ProcMethod.DynamicInvoke(newArgs);
}
catch (Exception ex)
{
resultSet[i] = ex;
}
}
ThreadEvents[index].Set();
}
catch (Exception ex)
{
throw new Exception("An error occured while processing threads.", ex);
}
}
Conclusion
To test the performance of the library I have run several test varying on the number of input data set length and number of threads. Resulst prove that the multi-processing mechanism works and greatly improves performance as the number of data to process increases.
The performance increase provided can be easily monitored from the table below:
# of Employees |
Single threaded (sec) |
Multi-threaded (sec) |
Performance gain (%) |
10 threads |
50 threads |
1000 |
6.391 |
5.216 |
6.039 |
12.86 |
2000 |
22.419 |
14.604 |
13.376 |
40.33 |
5000 |
205.272 |
78.415 |
56.814 |
72.32 |
|
|
|
|
|