The Command Pattern and Chain of Responsibility for implementing ThreadPool
The Command Pattern and the Chain of Responsibility pattern lends itself elegantly for the implementation of the Thread Pool design. As the name implies, Thread Pool is a pool of threads that are available to do any work. The main advantage in having a thread pool is that it reduces the overhead of cyclic thread creation and destruction. This is in the context of applications that create and delete threads quite frequently.
Our goal is to provide a Thread Pool class design that can be integrated into an existing code without changing the class structure of the existing classes. That is, embracing the OC principles provide a plug-in type library for the legacy classes to make use of the Thread Pool concept. To elaborate further, if there is a class 'X
' that has a method 'methodX
' which is thread safe, then the user should be able to call methodX
in a thread by using a method exposed by the Thread Pool.
Given below is a code snippet where class X
= CTask
and methodX
= DoSomeTask
. Create a Thread Pool with, say, starting 5 threads and a maximum of 10 threads.
ThreadPool<CTask> t1(5,10);
Then, create a Command
object and specify the method that you want to be called from a thread in the Thread Pool, in this case - CTask::DoSomeTask
.
Command<CTask>* cmd =0;
cmd= new Command<CTask>(task1, &CTask::DoSomeTask, task1->GetTimeOut(), "ne2_11", PRIO_NORMAL);
After this, push the Command object to a queue, t1.QueueRequest(cmd);
and that's it; the Thread Pool will take care of the rest.
Okay, before we go further, we have to be familiar with both the Command pattern as well as the Chain of Responsibility pattern.
Intent of the Command pattern
Encapsulate a request as an object, thereby letting you parameterize clients with different requests, queue or log requests, and support undoable operations. [GoF, p233]
Basically, you can use it to encapsulate objects together with its method; so, here is the code snippet or class design that will be more illuminating. Note, this is just one way of this class design, the other being using a virtual method call execute that derived classes can override and implement. But, the following will be what we do.
class Command
{
private:
CTest* m_objPtr; void (CTest::*m_method)();
public:
explicit Command(CTest* ptr;, void (CTest::*pmethod)()) {
m_objPtr =ptr;
m_method = pmethod;
} void execute()
{
(m_objptr->*method)(); } ~Command(){};
}
Well, this is our building block, and the next step is to make this class generic. Also, not that our definition of method has restricted it to use a method with a signature void method(void);
. This can be modified as will be shown later.
Okay, so here goes the generic version of the class:
template <typename T>
class Command
{
T* m_objptr;
void (T::*method)();
public:
explicit Command(T* pObj,void (T::*p_method)())
{
:
The rest is the same as the previous class structure. Now, if you need to use a different signature for your method, you have to either use the std::bind1st
or std::bind2nd
, or write a similar class and helper method .
Suppose your method has the signature:
int CTask::ThreeParameterTask(int par1, int par2, int par3)
We will see how we can fit it for the Command pattern - for this, first, you have to write a member function adapter so that it can be called as a function object.
Note - this is ugly, and may be you can use the Boost bind helpers etc., but if you can't or don't want to, this is one way.
template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
class mem_fun3_t
{
public:
explicit mem_fun3_t(_Ret (_Class::*_Pm)(_arg1,_arg2,_arg3))
:m_Ptr(_Pm) {}
_Ret operator()(_Class *_P, _arg1 arg1, _arg2 arg2, _arg3 arg3) const
{
return ((_P->*m_Ptr)(arg1,arg2,arg3));
}
private:
_Ret (_Class::*m_Ptr)(_arg1,_arg2,_arg3);};
Also, we need a helper method mem_fun3
for the above class to aid in calling.
template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3> mem_fun3 ( _Ret (_Class::*_Pm)(_arg1,_arg2,_arg3) )
{
return (mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3>(_Pm));
}
Now, in order to bind the parameters, we have to write a binder function. So, here it goes:
template<typename _Func,typename _Ptr,typename _arg1,typename _arg2,typename _arg3>
class binder3
{
public:
binder3(_Func fn,_Ptr ptr,_arg1 i,_arg2 j,_arg3 k)
:m_ptr(ptr),m_fn(fn),m1(i),m2(j),m3(k){}
void operator()() const
{
m_fn(m_ptr,m1,m2,m3); }
private:
_Ptr m_ptr;
_Func m_fn;
_arg1 m1; _arg2 m2; _arg3 m3;
};
And, a helper function to use the binder3
class - bind3
:
template <typename _Func, typename _P1,typename _arg1,typename _arg2,typename _arg3>
binder3<_Func, _P1, _arg1, _arg2, _arg3> bind3(_Func func, _P1 p1,_arg1 i,_arg2 j,_arg3 k)
{
return binder3<_Func, _P1, _arg1, _arg2, _arg3> (func, p1,i,j,k);
}
Now, we have to use this with the Command
class; use the following typedef
:
typedef binder3<mem_fun3_t<int,T,int,int,int> ,T* ,int,int,int> F3;
explicit Command(T* pObj,F3* p_method,long timeout,const char* key,
long priority = PRIO_NORMAL ):
m_objptr(pObj),m_timeout(timeout),m_key(key),m_value(priority),method1(0),method0(0),
method(0)
{
method3 = p_method;
}
Here is how you call it:
F3 f3 = PluginThreadPool::bind3( PluginThreadPool::mem_fun3(
&CTask::ThreeParameterTask), task1,2122,23 );
Note: f3();
will call the method task1->ThreeParameterTask(21,22,23);
.
cmd= new Command<CTask>(task1, &f3 ,task1->GetTimeOut(),
"ne2_11",PluginThreadPool::PRIO_NORMAL);
We are over with the ugly stuff....
Okay, now on to the next pattern: the 'Chain of Responsibility.' Intent - Avoid coupling the sender of a request to its receiver by giving more than one object a chance to handle the request. Chain the receiving objects, and pass the request along the chain until an object handles it. [GoF, p223]
Basically, this pattern can be envisioned as a linked list of objects. We will design a class called ThreadChain
to model this pattern. In our case, we will use it to act as a linked list of ThreadChain
objects. These will act on a container of Command
objects. Each ThreadChain
object will have a thread associated with it, and also a flag called busy
which will be set to true
if the thread associated with it is executing a Command
.
The chaining of objects is done in the ThreadPool
constructor - see the code snippet.
ThreadChain<T> *prev = new ThreadChain<T>(0,this);root = prev;
for(int i=0;i<minthreadCount-1;++i)
{
ThreadChain<T> *temp = new ThreadChain<T>(prev,this);
prev=temp;
root = temp;
}
The ThreadChain
class has a method called canHandle
which basically checks if its thread is free to handle the task. If not, it then propagates to the next object.
bool canHandle()
{
if(!busy)
{
SetEvent(g_Event); return true;
}
:
if(!next)
{
return false; }
return next->canHandle(); }
Note that the ThreadChain
class has a method called HandleRequests
that waits in the thread associated with the object for this event. Once it gets this event, it proceeds with processing the request.
void HandleRequest()
{
while(!deleteMe)
{
WaitForSingleObject(g_Event,INFINITE);
:
busy=true;
:
Command<T>* temp = threadpool->GetRequest();
:
if(temp!=NULL)
{
:
m_timeout = temp->GetTimeOut()*1000; temp->execute();
delete temp;
Each ThreadChain
object also checks if the thread associated with it is in a hang state; if so, it will set the deleteMe
flag and unlink itself from the chain.
void HandleHungThreads(ThreadChain<T>* prev)
{
bool bIsHung =false;
if(IsHung() || ReleaseIdleThread())
{
bIsHung = true;
if(this == threadpool->root){
threadpool->root = next_p; prev= next;
} else
}
:
if(bIsHung) {
Release(); next->HandleHungThreads(prev); }
else
next->HandleHungThreads(this);
Also, if during a burst, the thread count increases from the minimum count and after that the load drops, then the ThreadChain
has a method called ReleaseIdleThreads
that does this.
bool ReleaseIdleThread()
{
WaitForSingleObject(threadChkMutex,INFINITE);
if(threadpool->m_threadCount <= threadpool->minthreadCount)
{
ReleaseMutex(threadChkMutex);
return false;
}
:
}else if(GetTickCount()-lastactivetime > MAX_IDLETIME)
{
printf("Max idle time exceeded for this thread\n");
bReleasethis=true;
}
ThreadPool
is a new class that we have designed that acts as the controller. It will take care of scaling up or down threads in the Thread Pool as well as initiating the checks for hanging threads etc. It holds a container containing the Command
objects. The container class is called CRequestQueue
.
The tasks can be executed on the basis of priority. The highest priority task will be first popped out of the queue. Also, each queue is associated with a key. This helps if you have multiple engines pumping in tasks; with this approach, you will iterate over the first task of each key in a cyclic manner, thereby avoiding any one overloaded engine from hogging the resource.
The ThreadPool
class also provides an interface to the client called QueRequest
, which is used to push in the framed Command
object by the client.
class CRequestQueue
{
private:
typedef priority_queue<Command<T>*, deque<Command<T>*>, Command<T>::Compare > REQUESTQUEUE;
std::map<long,REQUESTQUEUE > RequestQueue
Well, I guess I have given you the idea of the ThreadPool pattern. For the finer implementation details, you can check the source code attached.
Note - For VC compilers > VC6, I have added a minor source updated - threapoolvc7.zip.
References and Links
- GoF - Design Patters
- Converting C++ Member Functions into Function Objects