Introduction
I'd looked at a number of articles on the internet about multi-threading, and thread-pool designs, but could not really adopt any of them suitably to my requirements. Most of the thread pools I'd seen, had a lot of the thread management logic intertwined with the actual function that the thread was executing. I wanted a different conceptual view of it. To do this, I moved away from deriving from threads and assigning pointers to functions to execute, and latched onto the command pattern, from the inherently useful Design Patterns: Elements of Reusable Object-Oriented Software, Gamma, Helm et al. Requests are submitted as functors, allowing the functor to maintain its own environment.
Fundamentally, you submit a request in the form of a functor to a queue, and then let the thread-pool do the rest. I didn't want to have to "join" threads back to the main thread, or "wait" for completion. A thread was merely an execution process. It doesn't care what it's executing, it just keeps doing the next thing in its path.
Features
- Exception safe
- Configurable number of threads
- Configurable queue length
Usage
You'll need first of all to:
#include "ThreadPool.h"
Then, create the ThreadPool
object. The simple way is simply to create the ThreadPool
object thus:
ThreadPool myPool;
Once this is done, you can call myPool.accept()
to prepare the thread-pool to accept connections.
A second way, provides more control. We can derive from the ThreadPool
class, and override the onThreadStart
and onThreadFinish
methods to provide thread specific information.
class COMPool : public ThreadPool
{
public:
void onThreadStart(int threadId)
{
::OleInitialize(NULL);
}
void onThreadFinish(int threadId)
{
::OleUninitialize();
}
};
We also need to create one or more functors which we can request the thread pool to handle. These are created by deriving from ThreadRequest
. If you wish to pass in parameters, or retrieve information from the functor, you can provide these in a constructor. Also, any necessary cleanup can be done in the destructor.
class SpecialRequest : public ThreadRequest
{
public:
SpecialRequest(int param1, int param2, int& retStat) :
myLocal1(param1), myLocal2(param2), myStatus(retStat)
{
}
void operator()(int)
{
retStatus = myLocal1 + myLocal2;
}
private:
int myLocal1;
int myLocal2;
int& retStatus;
};
To submit the request, we use the previously defined ThreadPool
object, and submit the functor into the queue. In this case, creating it at the same time:
myPool.submitRequest(new SpecialRequest(1, 2, returnVal);
Note that the SpecialRequest
has the two parameters being passed in, 1 and 2, and the reference to the returnValue
for the functor to populate when it is completed. Note that the functor must be created on heap memory using new
, because the thread pool will delete it.
Once we are finished with the thread-pool, we can shut it down using
myPool.shutdown();
So, our main loop looks like this:
int main(int, char*)
{
int result;
COMPool myCOMThreadPool;
myCOMThreadPool.accept();
myCOMThreadPool.submitRequest(new SpecialRequest(1, 2, result));
myCOMThreadPool.shutdown();
std::cout << result;
}
The demonstration project contains a more explicit example using multiple threads, loops and thread statistics
Note: It's important that you handle any exceptions that your code may throw in the overridden operator()(int)
. The acceptHandler
will not allow any exceptions to propagate out of its loop in order to maintain integrity. This is not actually a bad thing, because your functor should be able to handle its own exceptions anyway.
The gory details
For the individuals who REALLY want to know how it works, the entire cpp and h files are commented using doxygen, and the tricky clever bits are commented also. What I'll try to explain here is why I've done certain things.
The guiding principle that I tried to follow was that a thread is a completely separate entity to the thing that it is executing. The idea being to disconnect the management of the threads and the execution completely. This would enable the thread-pool to be used easily for database calls - simulating an asynchronous call effectively - HTTP responses via fast-CGI, and calculations of PI to (insert arbitrarily large number here) decimal places, whilst simultaneously making me a coffee. I didn't want the thread to be aware of the execution status of the function/functor, and I didn't want the functor to be aware that it was being executed in a thread.
This provides a slightly different programming model to what would otherwise be expected. In many of the implementations I've seen, the worker thread is responsible for signalling it is finished, and the "main" thread of execution is responsible for waiting until the worker thread has signalled this. Then, the main thread needs to clean up the worker, or query the worker to get the workers results. Some of the implementations passed an arbitrary number of pointers around the place to indicate parameters and return values.
In this implementation, the worker thread doesn't wait at all. It executes the job and then cleans up after itself. If you need to get results, you can provide a pointer or reference to a structure in the constructor, and use the functor to populate that structure as part of its implementation. For example, you can send the results of a database commit into the passed constructor, and check this at anytime to see whether it has committed yet. Or, you can pass an entire business object, such as a Purchase Order, into the constructor, and make the functor responsible for populating the values. The thread doesn't care. All it does, is populates and cleans up after itself. If your application needs to wait for something to happen, you can include a signal (event) in the functor, and put the main thread to sleep until the signal is signalled. (sugged?) Again, you are waiting for the functor to complete not the thread. A conceptual difference, but I think a more accurate representation.
The second thing that I had to add was an onThreadStart
and onThreadFinish
call. In the simple case, this is a no-op, however by deriving from ThreadPool
you can make these do whatever you like. I had to add these because COM needed to be initialized per thread when I was using this for my OLEDB calls.
The queue itself when it reaches the maximum queue length will actually block, which in the case of the example provided will also block the main thread. This has the affect of preventing additional requests, allowing the pool time to catch up.
See my todo list at the bottom, for improvement ideas I have for this
I also used functors because for the OLEDB calls, I really needed transaction integrity. To provide this, the constructor of the functor could perform the beginTransaction()
and the destructor would call commit()
or rollback()
depending on the private transactionSuccess
flag. This ensured that either commit or rollback was called, regardless of the outcome, and improved exception safety incredibly. After implementing it this way, I realized just how effective functors were, so I ended up using them for the generic solution.
The beauty of this is that the functor contains everything that it needs to execute. Because it can maintain state, you can actually pass parameters in the constructor, and use these parameters to get information about the current functor object, including its state of execution, and the data returned. The best part though, is that by using a functor, you can test the functor itself outside of the multithreaded environment quite easily - allowing easy integration for unit tests.
Another benefit of functors is that they can maintain their own environment. In the case of the FastCGI application, I needed to pass the address of the output structure to return the output back to the webserver. I did this passing the environment, including the CGI parameters, the error stream and the output stream into the functor during construction. This meant that the same functor was perfectly thread safe because it was a completely individual object, and yet it had access to the environment that was created - at that time. On execution, it would always write to the correct output stream. This provided thread safety, without the need for mutexes and critical sections etc.
The acceptHandler
has been declared as throwing no exceptions. This is necessary, because exceptions would potentially disrupt the safety of other threads. If a functor throws an exception, and it's not handled in the functor (This is bad!), the handler will simply swallow the exception, and you will not see it. You should not bank on this functionality though, because it may change. (Not sure how, or why, but if someone comes up with a better method, I'll change it in a flash)
Todo
All these are tentative ideas, and not things that I'll implement until I actually need them.
- More debug information - i.e. Thread Information Block etc.
- Thread-pool monitoring
- average queue length
- average response time
- A
QueueFullException
to be optionally added instead of just blocking. May do this as a templated policy class (See Modern C++ Design, Andrei Alexandrescu)
- Dynamic addition and deletion of threads.
- Forced shutdown
Acknowledgement
Special thanks to Taka for reviewing the code for this article.
Changes and edits
- 18/01/2002: Fixed the apostrophe on it because it was bugging me.