Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / Win32

Another Thread Pool

4.70/5 (10 votes)
6 May 2010CPOL7 min read 32.8K   1.6K  
A simple thread pool with minimal dependencies

Introduction

This article is about a simple thread pool. It shows some basic techniques regarding multithreaded programming and by this it might be interesting for beginners as much as for intermediates who already have some basic experience in multithreaded programming. And, of course, the code can simply be used.

The basic benefit of this thread pool class is independence. It doesn't require any libraries or frameworks like STL or ATL, it uses only the basic Windows API. So it is easy to integrate in any Windows application without the need of using ATL, STL or MFC. Also it is more or less simple, so you will - even if you are not very experienced - understand what it is doing and how it works.

Using the Code

To use the thread pool, include the files ThreadPool.h and ThreadPool.cpp in your project and add a member variable of type CThreadPool to the class that manages the pool. In the sample project (WTL), this is a dialog:

C++
#include "ThreadPool.h"
...

class CMainDlg : public CDialogImpl<CMainDlg>
  ,public CUpdateUI<CMainDlg>
  ,public CMessageFilter
  ,public CIdleHandler
{
private:
  CThreadPool    m_ThreadPool;
...
}

Then initialize the thread pool. In the sample, it is done in OnInitDialog:

C++
HRESULT hr = m_ThreadPool.Init(2, 10); 

The Init() method takes up to 4 parameters:

C++
HRESULT Init(UINT uInitalNumberOfThreads, UINT uMaxNumberOfThreads = 0
  ,SIZE_T dwStackSize = 0, LPSECURITY_ATTRIBUTES pSecurityAttributes = NULL);

uInitalNumberOfThreads is the number of threads that will be initially created. The number of threads might grow up to uMaxNumberOfThreads. dwStackSize and pSecurityAttributes are the parameters that will be passed to the _beginthreadex / CreateThread function. All parameters except the first one are optional.

The pool returns S_OK if everything went well, E_FAIL if it is already initialized and some standard Windows error code wrapped in a HRESULT if something went wrong. In case not all threads could be created (which is most likely related to memory problems, see below) it will return S_FALSE.

You might also want to give your thread pool some work. This is done via worker objects of a class derived from CThreadObjectBase (in the sample: CThreadObject). These classes must implement the two virtual methods of CThreadObjectBase:

C++
class CThreadObjectBase
{
public:
  virtual void Run(CThreadPoolThreadCallback &pool) = 0;
  virtual void Done() = 0;
};

The first method takes one parameter, a reference to a CThreadPoolThreadCallback object. This is in fact the thread pool itself, but offers only one method: CanContinue() of type BOOL which the worker objects use to ask the pool if they should terminate execution.

The Run()-method is called from the executing thread and should do the actual work. Also, it should check from time to time if the pool wants to shutdown, and, if so, terminate properly:

C++
void CThreadObject::Run(CThreadPoolThreadCallback &pool)
{
  for (int n = 0; n < m_nRuns; n++)
  {
    if (!pool.CanContinue())
      return;
    // here we do the work
  }
}

The second method, Done(), is called after the Run() method has returned or when the worker object is removed from the list without being run (because the pool might shutdown before all waiting objects are served) and should be used for cleaning up. In the sample project, the worker object deletes itself after notifying the main window that it has finished:

C++
void CThreadObject::Done()
{
  InterlockedDecrement(&s_lCount);
  if (::IsWindow(m_hWndNotify))
    PostMessage(m_hWndNotify, WMX_OBJECT_REMOVED, NULL, (LPARAM)m_n);
  delete this;
}

To queue an object to the pool, you call the Add() method of CThreadPool and pass a pointer to the object as parameter:

C++
m_ThreadPool.Add(new CThreadObject());

The object will be put in the waiting-queue and as soon as a thread is available, its Run() method will be executed.

To close the thread pool, use the Close() method. This will empty the queue, stop and delete all threads and cleanup all used events. Also, it is possible to just empty the queue by calling EmptyQueue(), which will remove all waiting worker objects from the queue.

CThreadPool also has some methods for statistics:

  • GetActiveThreadCount()
    Returns the number of threads currently running an object.
  • GetThreadCount()
    Returns the number of threads created.
  • GetMaxThreadCount()
    Returns the maximum number of threads that may be created.
  • GetThreadId(UINT n)
    Returns the identifier of a certain thread. n is the index of the thread.
  • GetThreadStatus(UINT n)
    Returns the status of a thread: Working (TRUE) or waiting (FALSE). n is the index of the thread.

How Does It Work?

The Object Queue

First there is the queue that contains the worker objects waiting for execution. To keep the dependencies as low as possible, I decided to avoid using any STL or ATL list class. Anyway the queue is very simple, it is a basic FIFO, singly-linked list. It has to provide only a few functionalities: Adding objects to the end of the list, returning the first element of the list and tell if the list is empty. And, of course, a RemoveAll()-method for cleaning up the list.

But the list has to be thread safe, which is accomplished by using a CRITICAL_SECTION and an accessor-class. All access to the list is done via this accessor class CThreadObjectListAccessor. An instance is created every time the pool needs access to the list. The accessor's ctor locks the list, the dtor unlocks it. All methods for accessing the list are in the accessor class, so everything is done via the accessor. Using this technique guaranties that the list is always getting locked before every request and unlocked when the function doing the request(s) returns.

The Thread proc

This is what the thread proc looks like:

C++
DWORD WINAPI CThreadPool::CThread::ThreadProc(LPVOID p)
{
  LPTHREAD pThis = (LPTHREAD)p;
  CThreadPoolThreadCallback& pool =
    (CThreadPoolThreadCallback&)(pThis->m_Pool);

  HANDLE phWaitHandles[] =
  {
    pThis->m_Pool.m_hEventShutdown
    ,pThis->m_Pool.m_hEventObjectAvailable
  };

  BOOL bRun = TRUE;
  DWORD dw = 0;
  LPTHREADOBJECT pThreadObject = NULL;

  // the main thread loop
  while(bRun)
  {
    dw = WaitForMultipleObjects(2, phWaitHandles, FALSE, INFINITE);
    switch(dw)
    {
      case WAIT_OBJECT_0+1:  // object available
        pThis->m_bHasObject = pThis->m_Pool.GetNextObject(pThreadObject);
        if (pThis->m_bHasObject)
        {
          InterlockedIncrement((LONG*)&pThis->m_Pool.m_uThreadsActive);
          pThreadObject->Run(pool);
          pThreadObject->Done();
          pThis->m_bHasObject = FALSE;
          InterlockedDecrement((LONG*)&pThis->m_Pool.m_uThreadsActive);
        }
        // fallthrough
      case WAIT_TIMEOUT:  // timedout
        break;
      case WAIT_OBJECT_0:  // shutdown
      default:  // some error
        bRun = FALSE;
        break;
    }
  }
  pThis->m_dwThreadResult = dw;
#ifdef THREADPOOL_USE_CRT
  _endthreadex(dw);
#else
  ExitThread(dw);
#endif
  return dw;
}

The thread runs in a loop and waits for one of the two waithandles to be signaled. m_hEventObjectAvailable gets signaled when a worker object is added to the queue. It stays signaled as long as the queue is not empty. m_hEventShutdown gets signaled when the pool wants to shutdown and is queried from the thread as much as from the CanContinue() method of the pool to tell the running worker object if it should stop its work.

If the m_hEventObjectAvailable is signaled, the thread requests the next waiting object by calling GetNextObject() of the thread pool. If the queue is not empty, this method will pass the object in the pThreadObject parameter and return TRUE. The thread then updates the m_uThreadsActive member and calls the Run() and Done() members of the thread object.

At the end of ThreadProc(), you can see a precompiler switch THREADPOOL_USE_CRT, which is also used in CThread::Start:

C++
HRESULT CThreadPool::CThread::Start(SIZE_T dwStackSize
  ,LPSECURITY_ATTRIBUTES pSecurityAttributes)
{
  // start the thread 
#ifdef THREADPOOL_USE_CRT
  m_hThread = (HANDLE) _beginthreadex(pSecurityAttributes, 
	(unsigned int)dwStackSize
    ,(unsigned (__stdcall*)(void*)) CThreadPool::CThread::ThreadProc
    ,this, 0, (unsigned int*)&m_dwThreadID);
#else
  m_hThread = ::CreateThread(pSecurityAttributes, dwStackSize
    ,CThreadPool::CThread::ThreadProc, this, 0, &m_dwThreadID);
#endif
  HRESULT hr = HRESULT_FROM_WIN32(GetLastError());
  return (NULL == m_hThread) ? hr : S_OK;
}

There are also some discussions happening on the internet about _beginthread, _beginthreadex and CreateThread and when to use what. The basic rule regarding these two functions is that you should use _beginthread / _beginthreadex if you use the CRT and CreateThread if you don't do so.

Adding Worker Objects to the Queue

C++
HRESULT CThreadPool::Add(LPTHREADOBJECT pThreadObject)
{
  THREADASSERT(NULL != pThreadObject);
  CThreadObjectListAccessor acc(m_ThreadObjectList);
  HRESULT hr = acc.Add(pThreadObject);
  if (FAILED(hr))
    return hr;
  if ((m_uThreadsActive == m_uThreadsCreated)
    && (m_uThreadsCreated < m_uThreadCount))
  {
    AddThread();
  }
  SetEvent(m_hEventObjectAvailable);
  return S_OK;
}

Using the accessor, the new object is added to the queue. This might fail on low memory (allocation of a new list node), so Add() might return an E_OUTOFMEMORY-result. After the worker object has been added, the pool looks if all threads are busy, and, if so, and if the maximum number of threads is not yet reached, it adds a new thread to the pool by calling AddThread(). Afterwards, it sets the m_hEventObjectAvailable to signaled, so the next free thread will grab the added object. m_hEventObjectAvailable will be reset in GetNextObject() after fetching the worker object if the queue is empty.

How Many Threads Can Be Created?

This is often a topic in forums related to multithreaded programming. As many people already discovered (and also Microsoft says), the maximum number of threads is mainly related to the address space of a process. On win32, the maximum is around 2000 threads since every thread uses a stack size of 1MB by default, and the address space of a process is 2GB (see http://blogs.msdn.com/oldnewthing/archive/2005/07/29/444912.aspx). So if you plan to create more than 2000 threads, you should set the dwStackSize parameter to a value lower than 1MB. But anyway, do you really need 2000+ threads?

Finally

There is one thing to mention: The pool itself is NOT thread safe. It means, you should not call e.g. Init() from one thread and Close() from another. This is also one of the reasons why the worker objects get access to the pool only via CThreadPoolThreadCallback.

If you plan to pass SECURITY_ATTRIBUTES to the pool, they have to stay valid as long as the pool lives (at least until you call Close()), because the pool just copies the pointer, not the content of the SECURITY_ATTRIBUTES.

History

  • 6th May, 2010: Initial post

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)