Introduction
A common scenario in applications is having a pool of worker threads that has to process several objects waiting in a queue. The operation to be executed on the waiting objects is the same, but it can be carried out by different threads on different objects, as described in the next picture:
A single queue stores objects that need to be processed; a set of worker threads is running and each thread gets an object at a time from the queue and processes it by calling the same method.
This article presents an easy to use class that implements a ThreadPool and encapsulates a Queue.
Deriving your class from CMBThreadPool
The core of the sample project is the CMBThreadPool
class that encapsulates the System::Threading::Queue
class and implements the thread pool. This class uses the CMBThread
class to manage a single instance of a worker thread.
The CMBThreadPool
class provides Push
/Pop
methods to access its queue; using the Push
method, any System:Object
derived object can be inserted in the queue.
The threads can be controlled by the methods StartThreadPool
and StopThreadPool
.
Each worker thread (implemented by the CMBThread
class) gets an object at a time from the queue and calls the OnDoWork
virtual method.
To use this class, you just need to derive a new (managed) class from the CMBThreadPool
class in the following way:
__gc class CMyThreadPool :
public CMBThreadPool
{
public:
CMyThreadPool(void);
~CMyThreadPool(void);
void OnDoWork(System::Object* obj);
};
The CMyThreadPool
class overrides the OnDoWork
method that will be called each time an object can be processed.
In the sample, in the CMyThreadPool::OnDoWork
method, a Thread::Sleep
call simulates a slow I/O operation.
void CMyThreadPool::OnDoWork(System::Object* obj)
{
Thread::Sleep(2000);
Console::WriteLine(String::Format(S"OnDoWork {0}",
__try_cast�String*�(obj)));
}
Starting & stopping the thread pool
The thread pool is explicitly started by the StartThreadPool
method; the ThreadCount
parameter specifies how many threads are created, while nIdleTimeOut
and nProcessTimeOut
parameters are used to specify the time any thread of the pool has to wait when it finds the queue empty and the time it has to wait after each object process.
The thread can be stopped by the StopThreadPool
method; the thread can be stopped using one of the following options:
eStopAbort
- all threads are stopped immediately.
eStopHandleCurrent
- the thread pool is stopped after each thread ends handling the current object.
eStopHandleAll
- the thread pool is stopped the first time the queue is empty.
The last 2 options make the StopThreadPool
method waiting (and blocking the program execution) while the threads end processing objects in the queue.
In order to synchronize the threads' termination in a �safe� way, which is without aborting the threads when they are in the middle of something, a ManualResetEvent
object is used. Each thread managed by the CMBThread
class creates a ManualResetEvent
in the class constructor:
CMBThread::CMBThread(Int32 nIndex, CMBQueue* pMBQueue)
{
m_nIndex = nIndex;
m_pMBQueue = pMBQueue;
m_pEventStop = new ManualResetEvent(false);
m_pThreadStart =
new ThreadStart(this, &CMBThread::ThreadProc);
m_pThread =
new Thread(m_pThreadStart);
m_pThread->Start();
}
The event is created in the �not-signaled� state (meaning that any Wait call will be blocked until the object becomes signaled). Each thread calls the m_pEventStop->Set()
; to signal the event just before exiting.
In the StopThreadPool
method, an array of ManualResetEvent
is created and then a call to the WaitHandle::WaitAll
method blocks the execution until all event objects are signaled.
ManualResetEvent* manualEvents __gc [] =
new ManualResetEvent* __gc [m_arrayThreads->get_Count()];
for (i = 0; i < m_arrayThreads->get_Count(); i++ )
{
CMBThread* pMBThread =
__try_cast�CMBThread*�(m_arrayThreads->get_Item(i));
manualEvents[i] = pMBThread->m_pEventStop;
}
WaitHandle::WaitAll(manualEvents);