Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / Win32

A C++ Plug-in ThreadPool Design

4.21/5 (11 votes)
22 May 2008GPL35 min read 1   1.7K  
The Command Pattern and Chain of Responsibility for implementing a plug-in Thread Pool library.

The Command Pattern and Chain of Responsibility for implementing ThreadPool

The Command Pattern and the Chain of Responsibility pattern lends itself elegantly for the implementation of the Thread Pool design. As the name implies, Thread Pool is a pool of threads that are available to do any work. The main advantage in having a thread pool is that it reduces the overhead of cyclic thread creation and destruction. This is in the context of applications that create and delete threads quite frequently.

Our goal is to provide a Thread Pool class design that can be integrated into an existing code without changing the class structure of the existing classes. That is, embracing the OC principles provide a plug-in type library for the legacy classes to make use of the Thread Pool concept. To elaborate further, if there is a class 'X' that has a method 'methodX' which is thread safe, then the user should be able to call methodX in a thread by using a method exposed by the Thread Pool.

Given below is a code snippet where class X = CTask and methodX = DoSomeTask. Create a Thread Pool with, say, starting 5 threads and a maximum of 10 threads.

C++
ThreadPool<CTask> t1(5,10); // Initial thread size 5, max thread size 10

Then, create a Command object and specify the method that you want to be called from a thread in the Thread Pool, in this case - CTask::DoSomeTask.

C++
Command<CTask>* cmd =0;
cmd= new Command<CTask>(task1, // pointer to the CTask object whose --
         &CTask::DoSomeTask, // method that will be executed in a
                                 // thread in the threadpool
         task1->GetTimeOut(), // the maximum time for the task
                                 // to finish , before it is signaled 
                                 // as a hang
         "ne2_11", // a key that can be used as a logical bucket 
                             // for tasks (can be unique also)
         PRIO_NORMAL); // the priority of the task

After this, push the Command object to a queue, t1.QueueRequest(cmd); and that's it; the Thread Pool will take care of the rest.

Okay, before we go further, we have to be familiar with both the Command pattern as well as the Chain of Responsibility pattern.

Intent of the Command pattern

Encapsulate a request as an object, thereby letting you parameterize clients with different requests, queue or log requests, and support undoable operations. [GoF, p233]

Basically, you can use it to encapsulate objects together with its method; so, here is the code snippet or class design that will be more illuminating. Note, this is just one way of this class design, the other being using a virtual method call execute that derived classes can override and implement. But, the following will be what we do.

C++
class Command
{
private:
    CTest* m_objPtr; // the object pointer
    void (CTest::*m_method)(); // A pointer to the method 

public:
    explicit Command(CTest* ptr;, void (CTest::*pmethod)()) // constructor
    {
        m_objPtr =ptr;
        m_method = pmethod;
    }//The execute method that is invoked by the client
    void execute()
    {
        (m_objptr->*method)(); // Calls the method of the object
    }// And now the destructor
    ~Command(){}; //Note do not call 'delete m_objptr' as we do not own this

}

Well, this is our building block, and the next step is to make this class generic. Also, not that our definition of method has restricted it to use a method with a signature void method(void);. This can be modified as will be shown later.

Okay, so here goes the generic version of the class:

C++
template <typename T>
class Command
{
    T* m_objptr;
    void (T::*method)();
public:
    //Takes the pointer to an object and a pointer to the method
    explicit Command(T* pObj,void (T::*p_method)())
    {
        :

The rest is the same as the previous class structure. Now, if you need to use a different signature for your method, you have to either use the std::bind1st or std::bind2nd, or write a similar class and helper method .

Suppose your method has the signature:

C++
int CTask::ThreeParameterTask(int par1, int par2, int par3)

We will see how we can fit it for the Command pattern - for this, first, you have to write a member function adapter so that it can be called as a function object.

Note - this is ugly, and may be you can use the Boost bind helpers etc., but if you can't or don't want to, this is one way.

C++
// a template class for converting a member function of the type int function(int,int,int)
//to be called as a function object
template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
class mem_fun3_t
{
public:
    explicit mem_fun3_t(_Ret (_Class::*_Pm)(_arg1,_arg2,_arg3))
        :m_Ptr(_Pm) //okay here we store the member function pointer for later use
    {}

    //this operator call comes from the bind method
    _Ret operator()(_Class *_P, _arg1 arg1, _arg2 arg2, _arg3 arg3) const
    {
        return ((_P->*m_Ptr)(arg1,arg2,arg3));
    }
private:
    _Ret (_Class::*m_Ptr)(_arg1,_arg2,_arg3);// method pointer signature
};

Also, we need a helper method mem_fun3 for the above class to aid in calling.

C++
template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3> mem_fun3 ( _Ret (_Class::*_Pm)(_arg1,_arg2,_arg3) )
{
    return (mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3>(_Pm));
}

Now, in order to bind the parameters, we have to write a binder function. So, here it goes:

C++
template<typename _Func,typename _Ptr,typename _arg1,typename _arg2,typename _arg3>
class binder3
{
public:
    //This is the constructor that does the binding part
    binder3(_Func fn,_Ptr ptr,_arg1 i,_arg2 j,_arg3 k)
        :m_ptr(ptr),m_fn(fn),m1(i),m2(j),m3(k){}


        //and this is the function object 
        void operator()() const
        {
            m_fn(m_ptr,m1,m2,m3);//that calls the operator
        }
private:
    _Ptr m_ptr;
    _Func m_fn;
    _arg1 m1; _arg2 m2; _arg3 m3;
};

And, a helper function to use the binder3 class - bind3:

C++
//a helper function to call binder3
template <typename _Func, typename _P1,typename _arg1,typename _arg2,typename _arg3>
binder3<_Func, _P1, _arg1, _arg2, _arg3> bind3(_Func func, _P1 p1,_arg1 i,_arg2 j,_arg3 k)
{
    return binder3<_Func, _P1, _arg1, _arg2, _arg3> (func, p1,i,j,k);
}

Now, we have to use this with the Command class; use the following typedef:

C++
typedef binder3<mem_fun3_t<int,T,int,int,int> ,T* ,int,int,int> F3;

//and change the signature of the ctor
//just to illustrate the usage with a method signature taking more than one parameter
explicit Command(T* pObj,F3* p_method,long timeout,const char* key,
    long priority = PRIO_NORMAL ):
m_objptr(pObj),m_timeout(timeout),m_key(key),m_value(priority),method1(0),method0(0),
    method(0)
{
    method3 = p_method;
}

Here is how you call it:

C++
F3 f3 = PluginThreadPool::bind3( PluginThreadPool::mem_fun3( 
          &CTask::ThreeParameterTask), task1,2122,23 );

Note: f3(); will call the method task1->ThreeParameterTask(21,22,23);.

C++
cmd= new Command<CTask>(task1, &f3 ,task1->GetTimeOut(),
     "ne2_11",PluginThreadPool::PRIO_NORMAL);

We are over with the ugly stuff....

Okay, now on to the next pattern: the 'Chain of Responsibility.' Intent - Avoid coupling the sender of a request to its receiver by giving more than one object a chance to handle the request. Chain the receiving objects, and pass the request along the chain until an object handles it. [GoF, p223]

Basically, this pattern can be envisioned as a linked list of objects. We will design a class called ThreadChain to model this pattern. In our case, we will use it to act as a linked list of ThreadChain objects. These will act on a container of Command objects. Each ThreadChain object will have a thread associated with it, and also a flag called busy which will be set to true if the thread associated with it is executing a Command.

The chaining of objects is done in the ThreadPool constructor - see the code snippet.

C++
///There is at least one thread in the thread pool
ThreadChain<T> *prev = new ThreadChain<T>(0,this);///last node
root = prev;

for(int i=0;i<minthreadCount-1;++i)
{
    ThreadChain<T> *temp = new ThreadChain<T>(prev,this);
    prev=temp;
    root = temp;
}

The ThreadChain class has a method called canHandle which basically checks if its thread is free to handle the task. If not, it then propagates to the next object.

C++
bool canHandle()
{
    if(!busy)
    {
        SetEvent(g_Event); // signal the waiting thread for handling task
        return true;
    }
    :
    if(!next)
    {
        return false; ///max thread count reached ///If the thread count is already
                                                  ///max nothing to be done
    }
    return next->canHandle(); ///Else see if the next object in the chain is free
                              ///to handle the request
}

Note that the ThreadChain class has a method called HandleRequests that waits in the thread associated with the object for this event. Once it gets this event, it proceeds with processing the request.

C++
void HandleRequest()
{
    while(!deleteMe)
    ///The deleteMe is set in the dtor usually if this threads time
    ///out is over
    {
        WaitForSingleObject(g_Event,INFINITE);
        ///This event is signaled from the
        ///canHandle method
        :
        busy=true;
        ///Now set that this object as busy-it is going to handle a request
        :
        Command<T>* temp = threadpool->GetRequest();
        :
        if(temp!=NULL)
        {
            :
            m_timeout = temp->GetTimeOut()*1000; /// The time out is in seconds
            temp->execute();
            delete temp;

Each ThreadChain object also checks if the thread associated with it is in a hang state; if so, it will set the deleteMe flag and unlink itself from the chain.

C++
///The method to handle the hung threads
void HandleHungThreads(ThreadChain<T>* prev)
{
    bool bIsHung =false;
    if(IsHung() || ReleaseIdleThread())
    {
        bIsHung = true;
        if(this == threadpool->root){ 
            threadpool->root = next_p; ///case if root is hanging
            prev= next; 
        } else
            // remove this item from thread chain link and link the
            // other two prev->next = GetNext(); 
    }
    :
    if(bIsHung)///if this is a hung thread 
    {
        Release(); ///then release it
        next->HandleHungThreads(prev);// 'this' is out of link
    }
    else
        next->HandleHungThreads(this); ///propagate to the next object

Also, if during a burst, the thread count increases from the minimum count and after that the load drops, then the ThreadChain has a method called ReleaseIdleThreads that does this.

C++
bool ReleaseIdleThread()
{
    WaitForSingleObject(threadChkMutex,INFINITE);
    if(threadpool->m_threadCount <= threadpool->minthreadCount)
    //thread count is equal to the minimum thread count
    {
        ReleaseMutex(threadChkMutex);
        return false;
    }
    :
}else if(GetTickCount()-lastactivetime > MAX_IDLETIME)
{
    printf("Max idle time exceeded for this thread\n");
    bReleasethis=true;
}

ThreadPool is a new class that we have designed that acts as the controller. It will take care of scaling up or down threads in the Thread Pool as well as initiating the checks for hanging threads etc. It holds a container containing the Command objects. The container class is called CRequestQueue.

The tasks can be executed on the basis of priority. The highest priority task will be first popped out of the queue. Also, each queue is associated with a key. This helps if you have multiple engines pumping in tasks; with this approach, you will iterate over the first task of each key in a cyclic manner, thereby avoiding any one overloaded engine from hogging the resource.

The ThreadPool class also provides an interface to the client called QueRequest, which is used to push in the framed Command object by the client.

C++
class CRequestQueue
{
private:
    typedef priority_queue<Command<T>*,//value type
        deque<Command<T>*>,//container type
        Command<T>::Compare > REQUESTQUEUE; //comapritor

    //Map task key to the priority queue
    std::map<long,REQUESTQUEUE > RequestQueue

Well, I guess I have given you the idea of the ThreadPool pattern. For the finer implementation details, you can check the source code attached.

Note - For VC compilers > VC6, I have added a minor source updated - threapoolvc7.zip.

References and Links

  1. GoF - Design Patters
  2. Converting C++ Member Functions into Function Objects

License

This article, along with any associated source code and files, is licensed under The GNU General Public License (GPLv3)