Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / Win32

Leveraging Threads to Speed up Batch Processing

4.81/5 (12 votes)
17 Feb 2008CPOL9 min read 3   920  
Speed up your batch processing by splitting it into smaller tasks and executing them in concurrent threads

Introduction

I frequently come across the need to process large amounts (i.e. in the hundreds of thousands or even millions) of repetitive pieces of data such as records from a database table or a collection of objects, apply some business transformation, and then save them (e.g. rewrite them to the database, generate a report, etc.)

In a simple database example you could just write an SQL statement that does an UPDATE of the affected records based on certain selection criteria (as specified by the WHERE clause). This is a pretty limited way of doing things, however, and if you start getting into complex situations you might end up using cursors which are a whole other can of worms. So if you start performing the update in code, you are automatically obliged to get a DataReader, read the records one by one, apply your business logic, and then rewrite the results. For various reasons, this is very time consuming.

A non-database related example could be a collection holding several million objects that all need to be manipulated somehow. You will undoubtedly use a foreach() loop and handle the objects one by one, but this again is an inefficient use of CPU power. You can do better.

This article describes a very efficient and scalable mechanism for processing such large amounts of data. It uses multiple concurrent threads to handle small batches of work, and can take advantage of multiple core CPUs by creating more threads depending on how many cores there are.

The attached demo creates 5,000,000 customer objects that hold basic information such as an ID and a name, updates a discount percentage for each, and reverses the customer name 50 times (just so we have something time consuming to do). It was run on a quad-core machine for a benchmark and produced the following processing times:

  • Single thread execution (i.e. running on only one CPU the whole time): 5 minutes and 20 seconds.
  • Multiple thread execution (i.e. running two threads per CPU, for a total of eight concurrent threads): 2 minutes and 40 seconds.

You pick the winner.

Batch Processing the Old Fashioned Way

As stated in the introduction, you could process a batch of objects (or records) one at a time, by using a foreach() loop and handling the objects in that loop one at a time. Some of the advantages to this method are that it’s easier to understand the code and you don't have to deal with threading. That having being said, the code can be perfectly understood if it’s well documented (which it should be!), and if you're processing large amounts of data you should already be familiar with threading and should be using it by now. If you're not, then get on it.

A simple one-batch example:

C#
class CustomerMaintenance
{
    // a collection of Customer objects that we need to process
    List<Customer> mCustomerList;
    // Constructor. User passes in the collection of Customer
    // objects to process.
    public CustomerMaintenance (List<Customer> customerList)
    {
        mCustomerList = customerList;
    }
    // Update method applies all the business transformations
    // to the Customer objects, one at a time.
    public void Update()
    {
        foreach (Customer customerToUpdate in mCustomerList)
        {
            customerToUpdate.Discount *= 1.10;
        }
    }
}

In this example, a list of Customer objects is passed down to a CustomerMaintenance class.

The Update() method is then executed, and it just applies some changes to every single object in the list. This is a very simple example since the foreach() loop could very well contain much more complex code. It is that update code which will slow you down if you're doing something complex.

Batch Processing with Concurrent Threads

The biggest flaw in the example above is that it does not take advantage of multiple core CPUs, and is most definitely not scalable. If you compare the execution time between a single core and a multiple core machine running this code you might find a faster time on the latter, but it will not be a big difference. Moreover, the difference would most probably be due to a faster CPU, more RAM, or some other such factor. The biggest weapon in your arsenal - the extra CPUs - will just remain unused and untapped.

So here’s how you do it. Processing your data in concurrent threads involves several steps:

Step 1 - Assigning Key Values to Each Object

The first thing we need is the ability to refer to each object by a key value. When we launch the thread workers, we will not be passing them a batch containing all the objects to be handled. Rather, all those objects will remain in one collection visible to all the thread workers, and each worker will be working on distinct objects within this list, which they will access by a key.

Coming up with these keys is slightly different when working with objects as opposed to database records, but not hard to do. If you're dealing with a list of objects, you can either choose a property that will be unique amongst all those objects (such as the customer ID), or if you do not have such a property, you can simply use an incremental counter (which does not necessarily need to be a property of the objects).

To work with objects, simply declare a SortedList<key, object> where the key is that unique property and the object is the type of object you will be working with. Load the SortedList<key, object> with all the keys and values, and you're set to go. This is the shared collection of data that all worker threads will be updating. Next, create a List<key> that contains all the keys from that shared collection, and this becomes the pool from which all threads will receive their individual batches of work.

For example:

C#
// Shared collection, containing all the objects to update
SortedList<long, Customer> mCustomerList mCustomerList = new SortedList<long, Customer>();
// code to populate the dictionary with the Customer objects goes here
// Create a list of keys containing all key values to the shared
// collection. This becomes a sort of index.
List<long> allCustomerIDs = new List<long>(mCustomerList.Keys);

If, however, you're dealing with database records, you will need to use a key field in your SELECT statement. Assuming you are going to issue an UPDATE on all customer records in a customer table, you should first issue a SELECT statement retrieving all the customer IDs that will be affected, and then store those IDs in a List<key>. Just like with the object example above, this list then becomes the pool from which all threads will receive their individual batches of work. Contrary to the previous example, though, the individual threads will be updating records directly in the database, which is the equivalent of the shared collection of data used above (i.e. mCustomers).

Step 2 - Preparing a Semaphore

A semaphore (you will need to reference System.Threading for this one) is a very simple yet crucial element. It will control how many running thread workers we currently have, and when one has finished its work and exited, the semaphore will let us know that we can launch another worker. Semaphores are quite configurable, and you can easily specify how many requests it can handle.

For example:

// This will create a semaphore that helps control as many thread launches as we need to.
Semaphore mSemaphore = new Semaphore(numberOfThreadsToUse, numberOfThreadsToUse)

Step 3 - Looping Through the List of Keys and Dispatching Work in Batches

Now comes the fun part. We loop while the list of keys created in step 1 contains data, and do the following:

  • Wait until the semaphore has a free resource.
  • Reserve a resource in the semaphore.
  • Copy a predetermined number of items from keys list to a work list that will be passed down to the thread worker.
  • Remove those same keys from the keys list so they are not dispatched for processing a second time.
  • Launch the thread worker, passing it the work list. The thread will apply the business rules and modifications to the objects in the shared main list that are indexed by the key values passed down to it.
  • When the thread worker is done, it releases the semaphore resource (thereby enabling the launch of another thread worker) and exits.

For example:

C#
private void UpdateAllCustomersInConcurrentBatches()
{
    // retrieve the number of CPUs on this machine, and calculate the total number
    // of threads we should run.
    ManagementObjectSearcher managementObjectSearcher = 
        new ManagementObjectSearcher("select * from Win32_Processor");
    ManagementObjectCollection managementObjectCollection = 
        managementObjectSearcher.Get();
    int numberOfCpus = managementObjectCollection.Count;
    int numberOfThreadsToUse = numberOfCpus * mMaxNumberOfThreadsPerCpu;
    int batchSize = 5000;
    
     // get a list of all the key values to process
    List<long> allCustomerIDs = new List<long>(mCustomerList.Keys);
    while (allCustomerIDs.Count > 0)
    { 
        // make of list of customer IDs to process in the next batch
        List<long> customerIDsToProcess = allCustomerIDs.GetRange(0, 
        System.Math.Min(batchSize, allCustomerIDs.Count));
        // remove those customer IDs from the master list so they are not processed again
        allCustomerIDs.RemoveRange(0, System.Math.Min(batchSize, allCustomerIDs.Count));
    
        // wait for the semaphore to let us launch another thread
        mSemaphore.WaitOne();
    
        // launch a thread worker and give it the list of customer IDs to process
        ThreadPool.QueueUserWorkItem(new WaitCallback(UpdateAllCustomersInSubBatch), 
            customerIDsToProcess);
    }
    
    // ensure all threads have exited by waiting until we can get all 
    // the semaphore requests
    for (int ctr = 0; ctr < numberOfThreadsToUse; ctr++)
    {
       mSemaphore.WaitOne();
    }
    mSemaphore.Release(numberOfThreadsToUse);
}

Step 4 - Processing a Batch of Records in a Thread Worker

The method to process the records will be launched in step 3, and will receive the list of keys to work with. It will then use a foreach() loop to go through them, and using each key in the loop, access a Customer object in the shared collection and apply the appropriate business rules and changes to it.

Similarly, if you are working with database records you would use this key value to issue a SELECT statement for one record in the table, fetch it, update it, and write it back (or maybe just issue an UPDATE statement).

For example:

C#
private void UpdateAllCustomersInSubBatch(object state)
{
    try
    {
        List<long> customerIDsToProcess = state as List<long>;
        foreach (long customerID in customerIDsToProcess)
        {
            Customer tempCustomer = mCustomerList[customerID];
            // a foreach item cannot be passed down by reference, so pass 
            // a copy.
            ApplyBusinessRulesToCustomerObject(ref tempCustomer);
            Lock (mLock)
            {
                mCustomerList[customerID].Discount = tempCustomer.Discount;
                mCustomerList[customerID].Name = tempCustomer.Name;
            }
        }
    }
    catch (Exception ex)
    {
        lock (mLock)
        {
            // An exception was raised. This thread has no access to the UI, 
            // so store the exception in
            // mExceptions and get out.
            mExceptions.Add(ex);
        }
    }
    finally
    {
        // The work in this thread is complete. Release the semaphore request 
        // so that it can be reused to 
        // launch another thread worker for the next batch.
        mSemaphore.Release();
    }
}

A Note about Exceptions

An important thing to note about threads is that you have to be careful with exceptions. In a single threaded application, exceptions will be available to all the objects up the execution path, and you can therefore trap and handle them at any point.

In a multithreaded application, however, exceptions will only go up as high as the first method to execute in that thread. If you do not have any try/catch blocks to handle the exceptions before (or on) that point, you'll get an “unhandled exception” error, and the application will stop when the exception is raised.

To get around this, the demo uses a List<Exception> collection to store any and all exceptions raised during the thread’s execution. When we're done looping through the main list of object keys, we can check this collection for content, and if there are any exceptions they can be sent back to form, logged, etc. Additionally, we could alter the main dispatching foreach() loop (described in step 3) to stop dispatching new batches of work if the exception collection’s count is greater than zero.

A Note about Thread Safety and Locking

It’s also important to note that since multiple threads will be accessing and modifying the same objects (e.g. the shared object list and the exception collection), we need to protect against corruption and ensure that only one thread at a time has access to the shared resources. To do this, we use the lock() command, which basically blocks all other attempts to reach the enclosed code until the lock exits. This ensures that no two threads will execute the same piece of code at the same time.

In Conclusion

Splitting up massive amounts of repetitive work into smaller batches and processing them in parallel is simply a must-have when it comes to handling large amounts of data. The benchmark I show in the introduction shows how you can easily save 50% of your execution time, although the savings will largely be determined by the exact type of transformations you need to make to the data, and how the code is implemented. Also, keep in mind that the savings will keep on increasing if you run your application on machines with a higher number of CPUs.

This is a time saver that cannot be ignored, and I hope it helps someone out there. If it helps you, then vote for it.

History

  • December 13, 2007 - Initial post

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)