Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

An exception safe OO thread-pool framework

0.00/5 (No votes)
16 May 2013 1  
Provides a plug in multithreaded environment, using an exception safe thread-pool and functors.

Introduction

I'd looked at a number of articles on the internet about multi-threading, and thread-pool designs, but could not really adopt any of them suitably to my requirements. Most of the thread pools I'd seen, had a lot of the thread management logic intertwined with the actual function that the thread was executing. I wanted a different conceptual view of it. To do this, I moved away from deriving from threads and assigning pointers to functions to execute, and latched onto the command pattern, from the inherently useful Design Patterns: Elements of Reusable Object-Oriented Software, Gamma, Helm et al. Requests are submitted as functors, allowing the functor to maintain its own environment.

Fundamentally, you submit a request in the form of a functor to a queue, and then let the thread-pool do the rest. I didn't want to have to "join" threads back to the main thread, or "wait" for completion. A thread was merely an execution process. It doesn't care what it's executing, it just keeps doing the next thing in its path.

Features

  • Exception safe
  • Configurable number of threads
  • Configurable queue length

Usage

You'll need first of all to:

#include "ThreadPool.h"

Then, create the ThreadPool object. The simple way is simply to create the ThreadPool object thus:

ThreadPool myPool;

Once this is done, you can call myPool.accept() to prepare the thread-pool to accept connections.

A second way, provides more control. We can derive from the ThreadPool class, and override the onThreadStart and onThreadFinish methods to provide thread specific information.

class COMPool : public ThreadPool
{
public:
    void onThreadStart(int threadId)
    {
        ::OleInitialize(NULL);
    }
    void onThreadFinish(int threadId)
    {
        ::OleUninitialize();
    }
};

We also need to create one or more functors which we can request the thread pool to handle. These are created by deriving from ThreadRequest. If you wish to pass in parameters, or retrieve information from the functor, you can provide these in a constructor. Also, any necessary cleanup can be done in the destructor.

class SpecialRequest : public ThreadRequest
{
public:
    SpecialRequest(int param1, int param2, int& retStat) : 
        myLocal1(param1), myLocal2(param2), myStatus(retStat)
    {
        // Any constructor setup stuff, like transaction begin or 
        //file opens or whatever else the functor
        // may need to operate with.
    }

    void operator()(int)
    {
        // Do whatever you want in here - 
        //but DON'T let exceptions propogate.
        retStatus = myLocal1 + myLocal2;   
        // OK - so it just adds 2 integers....   but it's multithreaded!!!
    }

private:
    int myLocal1;
    int myLocal2;
    int& retStatus;
};

To submit the request, we use the previously defined ThreadPool object, and submit the functor into the queue. In this case, creating it at the same time:

myPool.submitRequest(new SpecialRequest(1, 2, returnVal);

Note that the SpecialRequest has the two parameters being passed in, 1 and 2, and the reference to the returnValue for the functor to populate when it is completed. Note that the functor must be created on heap memory using new, because the thread pool will delete it.

Once we are finished with the thread-pool, we can shut it down using

myPool.shutdown();

So, our main loop looks like this:

int main(int, char*)
{
    int result;
    COMPool myCOMThreadPool;
    // We tell it to accept requests now
    myCOMThreadPool.accept();
    // Add 1 and 2, and store in result     
    myCOMThreadPool.submitRequest(new SpecialRequest(1, 2, result));
    myCOMThreadPool.shutdown();
     // And output the result.
    std::cout << result;
}

The demonstration project contains a more explicit example using multiple threads, loops and thread statistics

Note: It's important that you handle any exceptions that your code may throw in the overridden operator()(int). The acceptHandler will not allow any exceptions to propagate out of its loop in order to maintain integrity. This is not actually a bad thing, because your functor should be able to handle its own exceptions anyway.

The gory details

For the individuals who REALLY want to know how it works, the entire cpp and h files are commented using doxygen, and the tricky clever bits are commented also. What I'll try to explain here is why I've done certain things.

The guiding principle that I tried to follow was that a thread is a completely separate entity to the thing that it is executing. The idea being to disconnect the management of the threads and the execution completely. This would enable the thread-pool to be used easily for database calls - simulating an asynchronous call effectively - HTTP responses via fast-CGI, and calculations of PI to (insert arbitrarily large number here) decimal places, whilst simultaneously making me a coffee. I didn't want the thread to be aware of the execution status of the function/functor, and I didn't want the functor to be aware that it was being executed in a thread.

This provides a slightly different programming model to what would otherwise be expected. In many of the implementations I've seen, the worker thread is responsible for signalling it is finished, and the "main" thread of execution is responsible for waiting until the worker thread has signalled this. Then, the main thread needs to clean up the worker, or query the worker to get the workers results. Some of the implementations passed an arbitrary number of pointers around the place to indicate parameters and return values.

In this implementation, the worker thread doesn't wait at all. It executes the job and then cleans up after itself. If you need to get results, you can provide a pointer or reference to a structure in the constructor, and use the functor to populate that structure as part of its implementation. For example, you can send the results of a database commit into the passed constructor, and check this at anytime to see whether it has committed yet. Or, you can pass an entire business object, such as a Purchase Order, into the constructor, and make the functor responsible for populating the values. The thread doesn't care. All it does, is populates and cleans up after itself. If your application needs to wait for something to happen, you can include a signal (event) in the functor, and put the main thread to sleep until the signal is signalled. (sugged?) Again, you are waiting for the functor to complete not the thread. A conceptual difference, but I think a more accurate representation.

The second thing that I had to add was an onThreadStart and onThreadFinish call. In the simple case, this is a no-op, however by deriving from ThreadPool you can make these do whatever you like. I had to add these because COM needed to be initialized per thread when I was using this for my OLEDB calls.

The queue itself when it reaches the maximum queue length will actually block, which in the case of the example provided will also block the main thread. This has the affect of preventing additional requests, allowing the pool time to catch up.

See my todo list at the bottom, for improvement ideas I have for this

I also used functors because for the OLEDB calls, I really needed transaction integrity. To provide this, the constructor of the functor could perform the beginTransaction() and the destructor would call commit() or rollback() depending on the private transactionSuccess flag. This ensured that either commit or rollback was called, regardless of the outcome, and improved exception safety incredibly. After implementing it this way, I realized just how effective functors were, so I ended up using them for the generic solution.

The beauty of this is that the functor contains everything that it needs to execute. Because it can maintain state, you can actually pass parameters in the constructor, and use these parameters to get information about the current functor object, including its state of execution, and the data returned. The best part though, is that by using a functor, you can test the functor itself outside of the multithreaded environment quite easily - allowing easy integration for unit tests.

Another benefit of functors is that they can maintain their own environment. In the case of the FastCGI application, I needed to pass the address of the output structure to return the output back to the webserver. I did this passing the environment, including the CGI parameters, the error stream and the output stream into the functor during construction. This meant that the same functor was perfectly thread safe because it was a completely individual object, and yet it had access to the environment that was created - at that time. On execution, it would always write to the correct output stream. This provided thread safety, without the need for mutexes and critical sections etc.

The acceptHandler has been declared as throwing no exceptions. This is necessary, because exceptions would potentially disrupt the safety of other threads. If a functor throws an exception, and it's not handled in the functor (This is bad!), the handler will simply swallow the exception, and you will not see it. You should not bank on this functionality though, because it may change. (Not sure how, or why, but if someone comes up with a better method, I'll change it in a flash)

Todo

All these are tentative ideas, and not things that I'll implement until I actually need them.

  • More debug information - i.e. Thread Information Block etc.
  • Thread-pool monitoring
    • average queue length
    • average response time
  • A QueueFullException to be optionally added instead of just blocking. May do this as a templated policy class (See Modern C++ Design, Andrei Alexandrescu)
  • Dynamic addition and deletion of threads.
  • Forced shutdown

Acknowledgement

Special thanks to Taka for reviewing the code for this article.

Changes and edits

  • 18/01/2002: Fixed the apostrophe on it because it was bugging me.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here