Introduction
I frequently come across the need to process large amounts (i.e. in the hundreds of thousands or even millions) of repetitive pieces of data such as records from a database table or a collection of objects, apply some business transformation, and then save them (e.g. rewrite them to the database, generate a report, etc.)
In a simple database example you could just write an SQL statement that does an UPDATE
of the affected records based on certain selection criteria (as specified by the WHERE
clause). This is a pretty limited way of doing things, however, and if you start getting into complex situations you might end up using cursors which are a whole other can of worms. So if you start performing the update in code, you are automatically obliged to get a DataReader
, read the records one by one, apply your business logic, and then rewrite the results. For various reasons, this is very time consuming.
A non-database related example could be a collection holding several million objects that all need to be manipulated somehow. You will undoubtedly use a foreach()
loop and handle the objects one by one, but this again is an inefficient use of CPU power. You can do better.
This article describes a very efficient and scalable mechanism for processing such large amounts of data. It uses multiple concurrent threads to handle small batches of work, and can take advantage of multiple core CPUs by creating more threads depending on how many cores there are.
The attached demo creates 5,000,000 customer objects that hold basic information such as an ID and a name, updates a discount percentage for each, and reverses the customer name 50 times (just so we have something time consuming to do). It was run on a quad-core machine for a benchmark and produced the following processing times:
- Single thread execution (i.e. running on only one CPU the whole time): 5 minutes and 20 seconds.
- Multiple thread execution (i.e. running two threads per CPU, for a total of eight concurrent threads): 2 minutes and 40 seconds.
You pick the winner.
Batch Processing the Old Fashioned Way
As stated in the introduction, you could process a batch of objects (or records) one at a time, by using a foreach()
loop and handling the objects in that loop one at a time. Some of the advantages to this method are that it’s easier to understand the code and you don't have to deal with threading. That having being said, the code can be perfectly understood if it’s well documented (which it should be!), and if you're processing large amounts of data you should already be familiar with threading and should be using it by now. If you're not, then get on it.
A simple one-batch example:
class CustomerMaintenance
{
List<Customer> mCustomerList;
public CustomerMaintenance (List<Customer> customerList)
{
mCustomerList = customerList;
}
public void Update()
{
foreach (Customer customerToUpdate in mCustomerList)
{
customerToUpdate.Discount *= 1.10;
}
}
}
In this example, a list of Customer
objects is passed down to a CustomerMaintenance
class.
The Update()
method is then executed, and it just applies some changes to every single object in the list. This is a very simple example since the foreach()
loop could very well contain much more complex code. It is that update code which will slow you down if you're doing something complex.
Batch Processing with Concurrent Threads
The biggest flaw in the example above is that it does not take advantage of multiple core CPUs, and is most definitely not scalable. If you compare the execution time between a single core and a multiple core machine running this code you might find a faster time on the latter, but it will not be a big difference. Moreover, the difference would most probably be due to a faster CPU, more RAM, or some other such factor. The biggest weapon in your arsenal - the extra CPUs - will just remain unused and untapped.
So here’s how you do it. Processing your data in concurrent threads involves several steps:
Step 1 - Assigning Key Values to Each Object
The first thing we need is the ability to refer to each object by a key value. When we launch the thread workers, we will not be passing them a batch containing all the objects to be handled. Rather, all those objects will remain in one collection visible to all the thread workers, and each worker will be working on distinct objects within this list, which they will access by a key.
Coming up with these keys is slightly different when working with objects as opposed to database records, but not hard to do. If you're dealing with a list of objects, you can either choose a property that will be unique amongst all those objects (such as the customer ID), or if you do not have such a property, you can simply use an incremental counter (which does not necessarily need to be a property of the objects).
To work with objects, simply declare a SortedList<key, object>
where the key is that unique property and the object is the type of object you will be working with. Load the SortedList<key, object>
with all the keys and values, and you're set to go. This is the shared collection of data that all worker threads will be updating. Next, create a List<key>
that contains all the keys from that shared collection, and this becomes the pool from which all threads will receive their individual batches of work.
For example:
SortedList<long, Customer> mCustomerList mCustomerList = new SortedList<long, Customer>();
List<long> allCustomerIDs = new List<long>(mCustomerList.Keys);
If, however, you're dealing with database records, you will need to use a key field in your SELECT
statement. Assuming you are going to issue an UPDATE
on all customer records in a customer table, you should first issue a SELECT
statement retrieving all the customer IDs that will be affected, and then store those IDs in a List<key>
. Just like with the object example above, this list then becomes the pool from which all threads will receive their individual batches of work. Contrary to the previous example, though, the individual threads will be updating records directly in the database, which is the equivalent of the shared collection of data used above (i.e. mCustomers
).
Step 2 - Preparing a Semaphore
A semaphore (you will need to reference System.Threading
for this one) is a very simple yet crucial element. It will control how many running thread workers we currently have, and when one has finished its work and exited, the semaphore will let us know that we can launch another worker. Semaphores are quite configurable, and you can easily specify how many requests it can handle.
For example:
Semaphore mSemaphore = new Semaphore(numberOfThreadsToUse, numberOfThreadsToUse)
Step 3 - Looping Through the List of Keys and Dispatching Work in Batches
Now comes the fun part. We loop while the list of keys created in step 1 contains data, and do the following:
- Wait until the semaphore has a free resource.
- Reserve a resource in the semaphore.
- Copy a predetermined number of items from keys list to a work list that will be passed down to the thread worker.
- Remove those same keys from the keys list so they are not dispatched for processing a second time.
- Launch the thread worker, passing it the work list. The thread will apply the business rules and modifications to the objects in the shared main list that are indexed by the key values passed down to it.
- When the thread worker is done, it releases the semaphore resource (thereby enabling the launch of another thread worker) and exits.
For example:
private void UpdateAllCustomersInConcurrentBatches()
{
ManagementObjectSearcher managementObjectSearcher =
new ManagementObjectSearcher("select * from Win32_Processor");
ManagementObjectCollection managementObjectCollection =
managementObjectSearcher.Get();
int numberOfCpus = managementObjectCollection.Count;
int numberOfThreadsToUse = numberOfCpus * mMaxNumberOfThreadsPerCpu;
int batchSize = 5000;
List<long> allCustomerIDs = new List<long>(mCustomerList.Keys);
while (allCustomerIDs.Count > 0)
{
List<long> customerIDsToProcess = allCustomerIDs.GetRange(0,
System.Math.Min(batchSize, allCustomerIDs.Count));
allCustomerIDs.RemoveRange(0, System.Math.Min(batchSize, allCustomerIDs.Count));
mSemaphore.WaitOne();
ThreadPool.QueueUserWorkItem(new WaitCallback(UpdateAllCustomersInSubBatch),
customerIDsToProcess);
}
for (int ctr = 0; ctr < numberOfThreadsToUse; ctr++)
{
mSemaphore.WaitOne();
}
mSemaphore.Release(numberOfThreadsToUse);
}
Step 4 - Processing a Batch of Records in a Thread Worker
The method to process the records will be launched in step 3, and will receive the list of keys to work with. It will then use a foreach()
loop to go through them, and using each key in the loop, access a Customer
object in the shared collection and apply the appropriate business rules and changes to it.
Similarly, if you are working with database records you would use this key value to issue a SELECT
statement for one record in the table, fetch it, update it, and write it back (or maybe just issue an UPDATE
statement).
For example:
private void UpdateAllCustomersInSubBatch(object state)
{
try
{
List<long> customerIDsToProcess = state as List<long>;
foreach (long customerID in customerIDsToProcess)
{
Customer tempCustomer = mCustomerList[customerID];
ApplyBusinessRulesToCustomerObject(ref tempCustomer);
Lock (mLock)
{
mCustomerList[customerID].Discount = tempCustomer.Discount;
mCustomerList[customerID].Name = tempCustomer.Name;
}
}
}
catch (Exception ex)
{
lock (mLock)
{
mExceptions.Add(ex);
}
}
finally
{
mSemaphore.Release();
}
}
A Note about Exceptions
An important thing to note about threads is that you have to be careful with exceptions. In a single threaded application, exceptions will be available to all the objects up the execution path, and you can therefore trap and handle them at any point.
In a multithreaded application, however, exceptions will only go up as high as the first method to execute in that thread. If you do not have any try
/catch
blocks to handle the exceptions before (or on) that point, you'll get an “unhandled exception” error, and the application will stop when the exception is raised.
To get around this, the demo uses a List<Exception>
collection to store any and all exceptions raised during the thread’s execution. When we're done looping through the main list of object keys, we can check this collection for content, and if there are any exceptions they can be sent back to form, logged, etc. Additionally, we could alter the main dispatching foreach()
loop (described in step 3) to stop dispatching new batches of work if the exception collection’s count is greater than zero.
A Note about Thread Safety and Locking
It’s also important to note that since multiple threads will be accessing and modifying the same objects (e.g. the shared object list and the exception collection), we need to protect against corruption and ensure that only one thread at a time has access to the shared resources. To do this, we use the lock()
command, which basically blocks all other attempts to reach the enclosed code until the lock exits. This ensures that no two threads will execute the same piece of code at the same time.
In Conclusion
Splitting up massive amounts of repetitive work into smaller batches and processing them in parallel is simply a must-have when it comes to handling large amounts of data. The benchmark I show in the introduction shows how you can easily save 50% of your execution time, although the savings will largely be determined by the exact type of transformations you need to make to the data, and how the code is implemented. Also, keep in mind that the savings will keep on increasing if you run your application on machines with a higher number of CPUs.
This is a time saver that cannot be ignored, and I hope it helps someone out there. If it helps you, then vote for it.
History
- December 13, 2007 - Initial post