Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Queue & ThreadPool class in Managed C++

0.00/5 (No votes)
9 Apr 2004 1  
The article describes a simple class that implements a ThreadPool based on an object Queue.

Introduction

A common scenario in applications is having a pool of worker threads that has to process several objects waiting in a queue. The operation to be executed on the waiting objects is the same, but it can be carried out by different threads on different objects, as described in the next picture:

Sample screenshot

A single queue stores objects that need to be processed; a set of worker threads is running and each thread gets an object at a time from the queue and processes it by calling the same method.

This article presents an easy to use class that implements a ThreadPool and encapsulates a Queue.

Deriving your class from CMBThreadPool

The core of the sample project is the CMBThreadPool class that encapsulates the System::Threading::Queue class and implements the thread pool. This class uses the CMBThread class to manage a single instance of a worker thread.

The CMBThreadPool class provides Push/Pop methods to access its queue; using the Push method, any System:Object derived object can be inserted in the queue.

The threads can be controlled by the methods StartThreadPool and StopThreadPool.

Each worker thread (implemented by the CMBThread class) gets an object at a time from the queue and calls the OnDoWork virtual method.

To use this class, you just need to derive a new (managed) class from the CMBThreadPool class in the following way:

__gc class CMyThreadPool :
    public CMBThreadPool
{
public:
    CMyThreadPool(void);
    ~CMyThreadPool(void);

    void OnDoWork(System::Object* obj);
};

The CMyThreadPool class overrides the OnDoWork method that will be called each time an object can be processed.

In the sample, in the CMyThreadPool::OnDoWork method, a Thread::Sleep call simulates a slow I/O operation.

void CMyThreadPool::OnDoWork(System::Object* obj)
{
    Thread::Sleep(2000); // simulate a slow I/O operation


    Console::WriteLine(String::Format(S"OnDoWork {0}",
                       __try_cast�String*�(obj)));
}

Starting & stopping the thread pool

The thread pool is explicitly started by the StartThreadPool method; the ThreadCount parameter specifies how many threads are created, while nIdleTimeOut and nProcessTimeOut parameters are used to specify the time any thread of the pool has to wait when it finds the queue empty and the time it has to wait after each object process.

The thread can be stopped by the StopThreadPool method; the thread can be stopped using one of the following options:

  • eStopAbort - all threads are stopped immediately.
  • eStopHandleCurrent - the thread pool is stopped after each thread ends handling the current object.
  • eStopHandleAll - the thread pool is stopped the first time the queue is empty.

The last 2 options make the StopThreadPool method waiting (and blocking the program execution) while the threads end processing objects in the queue.

In order to synchronize the threads' termination in a �safe� way, which is without aborting the threads when they are in the middle of something, a ManualResetEvent object is used. Each thread managed by the CMBThread class creates a ManualResetEvent in the class constructor:

CMBThread::CMBThread(Int32 nIndex, CMBQueue* pMBQueue)
{
    m_nIndex = nIndex;
    m_pMBQueue = pMBQueue;

    m_pEventStop = new ManualResetEvent(false); // initially not-signaled


    m_pThreadStart = 
        new ThreadStart(this, &CMBThread::ThreadProc);
    m_pThread = 
        new Thread(m_pThreadStart);
    
    m_pThread->Start();
}

The event is created in the �not-signaled� state (meaning that any Wait call will be blocked until the object becomes signaled). Each thread calls the m_pEventStop->Set(); to signal the event just before exiting.

In the StopThreadPool method, an array of ManualResetEvent is created and then a call to the WaitHandle::WaitAll method blocks the execution until all event objects are signaled.

ManualResetEvent* manualEvents __gc [] = 
    new ManualResetEvent* __gc [m_arrayThreads->get_Count()];

for (i = 0; i < m_arrayThreads->get_Count(); i++ )
{
    CMBThread* pMBThread = 
        __try_cast�CMBThread*�(m_arrayThreads->get_Item(i));
    manualEvents[i] = pMBThread->m_pEventStop;
}
        
WaitHandle::WaitAll(manualEvents);

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here