Introduction
This article is about a simple thread pool. It shows some basic techniques regarding multithreaded programming and by this it might be interesting for beginners as much as for intermediates who already have some basic experience in multithreaded programming. And, of course, the code can simply be used.
The basic benefit of this thread pool class is independence. It doesn't require any libraries or frameworks like STL or ATL, it uses only the basic Windows API. So it is easy to integrate in any Windows application without the need of using ATL, STL or MFC. Also it is more or less simple, so you will - even if you are not very experienced - understand what it is doing and how it works.
Using the Code
To use the thread pool, include the files ThreadPool.h and ThreadPool.cpp in your project and add a member variable of type CThreadPool
to the class that manages the pool. In the sample project (WTL), this is a dialog:
#include "ThreadPool.h"
...
class CMainDlg : public CDialogImpl<CMainDlg>
,public CUpdateUI<CMainDlg>
,public CMessageFilter
,public CIdleHandler
{
private:
CThreadPool m_ThreadPool;
...
}
Then initialize the thread pool. In the sample, it is done in OnInitDialog
:
HRESULT hr = m_ThreadPool.Init(2, 10);
The Init()
method takes up to 4 parameters:
HRESULT Init(UINT uInitalNumberOfThreads, UINT uMaxNumberOfThreads = 0
,SIZE_T dwStackSize = 0, LPSECURITY_ATTRIBUTES pSecurityAttributes = NULL);
uInitalNumberOfThreads
is the number of threads that will be initially created. The number of threads might grow up to uMaxNumberOfThreads
. dwStackSize
and pSecurityAttributes
are the parameters that will be passed to the _beginthreadex
/ CreateThread
function. All parameters except the first one are optional.
The pool returns S_OK
if everything went well, E_FAIL
if it is already initialized and some standard Windows error code wrapped in a HRESULT
if something went wrong. In case not all threads could be created (which is most likely related to memory problems, see below) it will return S_FALSE
.
You might also want to give your thread pool some work. This is done via worker objects of a class derived from CThreadObjectBase
(in the sample: CThreadObject
). These classes must implement the two virtual methods of CThreadObjectBase
:
class CThreadObjectBase
{
public:
virtual void Run(CThreadPoolThreadCallback &pool) = 0;
virtual void Done() = 0;
};
The first method takes one parameter, a reference to a CThreadPoolThreadCallback
object. This is in fact the thread pool itself, but offers only one method: CanContinue()
of type BOOL
which the worker objects use to ask the pool if they should terminate execution.
The Run()
-method is called from the executing thread and should do the actual work. Also, it should check from time to time if the pool wants to shutdown, and, if so, terminate properly:
void CThreadObject::Run(CThreadPoolThreadCallback &pool)
{
for (int n = 0; n < m_nRuns; n++)
{
if (!pool.CanContinue())
return;
}
}
The second method, Done()
, is called after the Run()
method has returned or when the worker object is removed from the list without being run (because the pool might shutdown before all waiting objects are served) and should be used for cleaning up. In the sample project, the worker object deletes itself after notifying the main window that it has finished:
void CThreadObject::Done()
{
InterlockedDecrement(&s_lCount);
if (::IsWindow(m_hWndNotify))
PostMessage(m_hWndNotify, WMX_OBJECT_REMOVED, NULL, (LPARAM)m_n);
delete this;
}
To queue an object to the pool, you call the Add()
method of CThreadPool
and pass a pointer to the object as parameter:
m_ThreadPool.Add(new CThreadObject());
The object will be put in the waiting-queue and as soon as a thread is available, its Run()
method will be executed.
To close the thread pool, use the Close()
method. This will empty the queue, stop and delete all threads and cleanup all used events. Also, it is possible to just empty the queue by calling EmptyQueue()
, which will remove all waiting worker objects from the queue.
CThreadPool
also has some methods for statistics:
GetActiveThreadCount()
Returns the number of threads currently running an object. GetThreadCount()
Returns the number of threads created. GetMaxThreadCount()
Returns the maximum number of threads that may be created. GetThreadId(UINT n)
Returns the identifier of a certain thread. n
is the index of the thread. GetThreadStatus(UINT n)
Returns the status of a thread: Working (TRUE
) or waiting (FALSE
). n
is the index of the thread.
How Does It Work?
The Object Queue
First there is the queue that contains the worker objects waiting for execution. To keep the dependencies as low as possible, I decided to avoid using any STL or ATL list class. Anyway the queue is very simple, it is a basic FIFO, singly-linked list. It has to provide only a few functionalities: Adding objects to the end of the list, returning the first element of the list and tell if the list is empty. And, of course, a RemoveAll()
-method for cleaning up the list.
But the list has to be thread safe, which is accomplished by using a CRITICAL_SECTION
and an accessor-class. All access to the list is done via this accessor class CThreadObjectListAccessor
. An instance is created every time the pool needs access to the list. The accessor's ctor
locks the list, the dtor
unlocks it. All methods for accessing the list are in the accessor class, so everything is done via the accessor. Using this technique guaranties that the list is always getting locked before every request and unlocked when the function doing the request(s) returns.
The Thread proc
This is what the thread proc looks like:
DWORD WINAPI CThreadPool::CThread::ThreadProc(LPVOID p)
{
LPTHREAD pThis = (LPTHREAD)p;
CThreadPoolThreadCallback& pool =
(CThreadPoolThreadCallback&)(pThis->m_Pool);
HANDLE phWaitHandles[] =
{
pThis->m_Pool.m_hEventShutdown
,pThis->m_Pool.m_hEventObjectAvailable
};
BOOL bRun = TRUE;
DWORD dw = 0;
LPTHREADOBJECT pThreadObject = NULL;
while(bRun)
{
dw = WaitForMultipleObjects(2, phWaitHandles, FALSE, INFINITE);
switch(dw)
{
case WAIT_OBJECT_0+1: pThis->m_bHasObject = pThis->m_Pool.GetNextObject(pThreadObject);
if (pThis->m_bHasObject)
{
InterlockedIncrement((LONG*)&pThis->m_Pool.m_uThreadsActive);
pThreadObject->Run(pool);
pThreadObject->Done();
pThis->m_bHasObject = FALSE;
InterlockedDecrement((LONG*)&pThis->m_Pool.m_uThreadsActive);
}
case WAIT_TIMEOUT: break;
case WAIT_OBJECT_0: default: bRun = FALSE;
break;
}
}
pThis->m_dwThreadResult = dw;
#ifdef THREADPOOL_USE_CRT
_endthreadex(dw);
#else
ExitThread(dw);
#endif
return dw;
}
The thread runs in a loop and waits for one of the two waithandles to be signaled. m_hEventObjectAvailable
gets signaled when a worker object is added to the queue. It stays signaled as long as the queue is not empty. m_hEventShutdown
gets signaled when the pool wants to shutdown and is queried from the thread as much as from the CanContinue()
method of the pool to tell the running worker object if it should stop its work.
If the m_hEventObjectAvailable
is signaled, the thread requests the next waiting object by calling GetNextObject()
of the thread pool. If the queue is not empty, this method will pass the object in the pThreadObject
parameter and return TRUE
. The thread then updates the m_uThreadsActive
member and calls the Run()
and Done()
members of the thread object.
At the end of ThreadProc()
, you can see a precompiler switch THREADPOOL_USE_CRT
, which is also used in CThread::Start
:
HRESULT CThreadPool::CThread::Start(SIZE_T dwStackSize
,LPSECURITY_ATTRIBUTES pSecurityAttributes)
{
#ifdef THREADPOOL_USE_CRT
m_hThread = (HANDLE) _beginthreadex(pSecurityAttributes,
(unsigned int)dwStackSize
,(unsigned (__stdcall*)(void*)) CThreadPool::CThread::ThreadProc
,this, 0, (unsigned int*)&m_dwThreadID);
#else
m_hThread = ::CreateThread(pSecurityAttributes, dwStackSize
,CThreadPool::CThread::ThreadProc, this, 0, &m_dwThreadID);
#endif
HRESULT hr = HRESULT_FROM_WIN32(GetLastError());
return (NULL == m_hThread) ? hr : S_OK;
}
There are also some discussions happening on the internet about _beginthread
, _beginthreadex
and CreateThread
and when to use what. The basic rule regarding these two functions is that you should use _beginthread
/ _beginthreadex
if you use the CRT and CreateThread
if you don't do so.
Adding Worker Objects to the Queue
HRESULT CThreadPool::Add(LPTHREADOBJECT pThreadObject)
{
THREADASSERT(NULL != pThreadObject);
CThreadObjectListAccessor acc(m_ThreadObjectList);
HRESULT hr = acc.Add(pThreadObject);
if (FAILED(hr))
return hr;
if ((m_uThreadsActive == m_uThreadsCreated)
&& (m_uThreadsCreated < m_uThreadCount))
{
AddThread();
}
SetEvent(m_hEventObjectAvailable);
return S_OK;
}
Using the accessor, the new object is added to the queue. This might fail on low memory (allocation of a new list node), so Add()
might return an E_OUTOFMEMORY
-result. After the worker object has been added, the pool looks if all threads are busy, and, if so, and if the maximum number of threads is not yet reached, it adds a new thread to the pool by calling AddThread()
. Afterwards, it sets the m_hEventObjectAvailable
to signaled, so the next free thread will grab the added object. m_hEventObjectAvailable
will be reset in GetNextObject()
after fetching the worker object if the queue is empty.
How Many Threads Can Be Created?
This is often a topic in forums related to multithreaded programming. As many people already discovered (and also Microsoft says), the maximum number of threads is mainly related to the address space of a process. On win32, the maximum is around 2000 threads since every thread uses a stack size of 1MB by default, and the address space of a process is 2GB (see http://blogs.msdn.com/oldnewthing/archive/2005/07/29/444912.aspx). So if you plan to create more than 2000 threads, you should set the dwStackSize
parameter to a value lower than 1MB. But anyway, do you really need 2000+ threads?
Finally
There is one thing to mention: The pool itself is NOT thread safe. It means, you should not call e.g. Init()
from one thread and Close()
from another. This is also one of the reasons why the worker objects get access to the pool only via CThreadPoolThreadCallback
.
If you plan to pass SECURITY_ATTRIBUTES
to the pool, they have to stay valid as long as the pool lives (at least until you call Close()
), because the pool just copies the pointer, not the content of the SECURITY_ATTRIBUTES
.
History
- 6th May, 2010: Initial post