Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Threading is easy (A simple thread, thread pool, object pool, and more)

4.55/5 (10 votes)
9 Oct 2007CPOL4 min read 1   687  
This simple library provides an implementation of almost all aspects of multithreaded programming.

Introduction

This simple library provides an implementation of almost all aspects of multithreaded programming, and it does not require a deep understanding of the multithreading concept. When I wrote this library, I was keeping in mind that the library must be easy to use for any programmer with a basic understanding of C++ and templates.

This library removes all the extra work related to the management of threads, so you can spend your time concentrating on your business logic implementation.

Using the code

There are five most important classes and interfaces in this library that you need to be aware of:

  1. CSafeThread is a template class which describes an abstract thread. I decided to define this class as a template class, because of the unknown parameter for the thread and most importantly, because I want to call the Stop() virtual method in the destructor of the class and this is appropriate only if CSafeThread is a concrete class.
  2. CThreadInterface is an abstract class (interface). This class is the base class for your concrete implementation of a thread. There are two steps to implement a thread. First, define your thread class and derive it from the CThreadInterface interface.
  3. C++
    class CMyThread: public mb_thread_lib::CThreadInterface<_bstr_t> 
    { 
        protected: 
            virtual void Thread(mb_thread_lib::smart_ptr<_bstr_t> sp)     
            {
            //this is our thread, sp is a parameter we are passing in 
    
            // implement your thread code here 
    
            } 
    };

    Now, instantiate your thread:

    C++
    mb_thread_lib::CSafeThread< CMyThread, _bstr_t> Thread;

    Start the thread:

    C++
    Thread.Start( mb_thread_lib::smart_ptr<_bstr_t>(
                  new _bstr_t("parameter for thread")) );

    Wait for this thread to finish:

    C++
    Thread.Wait();

    You can explicitly stop the thread by calling the Stop() method of the CSafeThread instance, or by destroying the instance. If you want to be notified of the thread events, you need to implement the Notify() method in your thread class.

    C++
    void Notify(mb_thread_lib::NOTIFY_EVENT_TYPE evt)
    {
        switch(evt)
        {
            case mb_thread_lib::BEGIN_THREAD_EVENT:
                m_bStop = false;
                break;
            case mb_thread_lib::TERMINATE_THREAD_EVENT:
                m_bStop = true;
                break;
            default:
                break;
        }
    };

    There are three types of events:

    • BEGIN_THREAD_EVENT – occurs when a thread begins its execution,
    • END_THREAD_EVENT – occurs when a thread is finished by exiting the Thread() method,
    • TERMINATE_THREAD_EVENT – occurs when the Stop() method is invoked.
  4. CSequentialObjectPool is a template class which implements a work item queue processor. The object pool processes objects from a queue one by one in a single thread. A work item is user defined concrete class derived from CWorkItemInterface.
  5. Let's look at the TestSequentialObjectPool example in the source code. First, I define the CTestWorkItem class, this is a work item class. This class implements a task that needs to be executed. You must derive this class from CWorkItemInterface which is an abstract class. You can create a different work item type to work with the same CSequentialObjectPool instance.

    C++
    class CTestWorkItem: public mb_thread_lib::CWorkItemInterface
    {
        bool bPrcessing;
        public:
        CTestWorkItem()
        {
        }
        ~CTestWorkItem()
        {
        }
        void ProcessWorkItem()
        {
            //execute our code here
            //…
    
        }
        void AbortWorkItem()
        {
            //abort our item
            //…
    
        }
    };

    Now, I define the CPushThread class. This is a helper thread class to push work items for processing to the CSequentialObjectPool instance. This class implements the CSafeThread type of class (you already know how to create threads, see above).

    C++
    class CPushThread: public mb_thread_lib::CThreadInterface
         <mb_thread_lib::CSafeThread<mb_thread_lib::CSequentialObjectPool<_bstr_t>, 
         _bstr_t> >
    {
        volatile bool m_bStop;
        protected:
            void Notify(mb_thread_lib::NOTIFY_EVENT_TYPE evt)
            {
                switch(evt)
                {
                    case mb_thread_lib::BEGIN_THREAD_EVENT:
                        m_bStop = false;
                        break;
                    case mb_thread_lib::TERMINATE_THREAD_EVENT:
                        m_bStop = true;
                        break;
                    default:
                        break;
                }
            };
        public:
            void Thread(mb_thread_lib::smart_ptr<mb_thread_lib::CSafeThread
                 <mb_thread_lib::CSequentialObjectPool<_bstr_t>, 
                 _bstr_t> > spWorkQueue)
            {
                for(int i = 0; ((!m_bStop)&&(i < MY_MAX_ITER*2)); ++i)
                {
                  _bstr_t bstr("Val: ");
                  _variant_t var;
                  var = i;
                  var.ChangeType(VT_BSTR);
                  bstr = bstr + var.bstrVal;CTestWorkItem *pWorkItem = new CTestWorkItem();
                  pWorkItem->m_bstr = bstr;
                  mb_thread_lib::smart_ptr< mb_thread_lib::CWorkItemInterface > 
                   spWorkItem(dynamic_cast<mb_thread_lib::CWorkItemInterface*>(pWorkItem));
    
                  if(!spWorkQueue->QueueWorkItem(spWorkItem))
                      break;
                }
            }
    };

    Now, I create an instance of the CSequentialObjectPool class, and because it's a thread based class, I invoke the Start() method to begin a thread to process the work items. I the invoke the Start() method of my push thread, which is an instance of the CPushThread class, with passes a smart pointer to the CSequentialObjectPool instance.

    C++
    //create sequental object pool (work queue)
    
    mb_thread_lib::smart_ptr<mb_thread_lib::CSafeThread
        <mb_thread_lib::CSequentialObjectPool<_bstr_t>, _bstr_t> > 
        spWorkQueue(new mb_thread_lib::CSafeThread
        <mb_thread_lib::CSequentialObjectPool<_bstr_t>, _bstr_t>());
    //create a push thread (this thread feeds work queue
    // with objects for sequental processing)
    
    mb_thread_lib::CSafeThread<CPushThread, 
      mb_thread_lib::CSafeThread<mb_thread_lib::CSequentialObjectPool<_bstr_t>, 
      _bstr_t> > PushThread; 
    //start work queue
    
    spWorkQueue->Start(mb_thread_lib::smart_ptr<_bstr_t>(NULL));
    //start push thread
    
    PushThread.Start(spWorkQueue);
    Sleep(1000);
  6. The last most important class is the CThreadPool class. In order to create a pool of threads in your application, you need to define a work item class that is derived from the CWorkItemInterface interface, then instantiate and initialize the CThreadPool class. And finally, queue a work item for processing in the thread pool.
  7. C++
    //create thread pool
    
    mb_thread_lib::CThreadPool ThreadPool;
    //initialize thread pool with 2 MIN threads, 
    //15 MAX threads and thread idle time 10 sec
    
    ThreadPool.InitPool(2, 15, 10000);
    _variant_t var;
    int i;
    //creaet 50000 work items
    
    for(i = 0; i < 50000; ++i)
    {
        var = i;
        var.ChangeType(VT_BSTR);
        CTestWorkItem* p = new CTestWorkItem();
        p->m_bstr = var.bstrVal;
        mb_thread_lib::smart_ptr<mb_thread_lib::CWorkItemInterface> spWorkItem(p);
        //push work items into the thread pool
    
        ThreadPool.QueueWorkItem(spWorkItem);
    }

    Another helpful class is CTimer. In order to use the CTimer class, you must define a work item class that is derived from CWorkItemInterface, create an instance of the CTimer class, and start the timer with a reference to a work item and a due time.

    C++
    CTestWorkItem* p = new CTestWorkItem();
    p->m_bstr = L"Timer work item.";
    mb_thread_lib::smart_ptr<mb_thread_lib::CWorkItemInterface> spWorkItem(p);
    mb_thread_lib::CTimer Timer;
    DWORD d = 2000;
    printf("execute handler every %d second(s)\n", d/1000);
    Timer.Start(spWorkItem,d/* 2 sec. */);
    Sleep(10000);
    Timer.Stop();

In addition to these main classes, there are several synchronization classes and a smart pointer implementation. There are two types of critical section classes, CLocalCriticalSec and CGlobalCriticalSec. The CLocalCriticalSec class is a wrapper of the critical section Win32 APIs, and CGlobalCriticalSec is a wrapper of the mutex Win32 APIs. In order to instantiate a named critical section, local or global, there is the CCriticalSecFactory class in the library. To dispose a named critical section, use the CCriticalSecDisposer class. You can see an example in the "ThreadTest.spp" under the following comments: "test critical sections". The CAutoCriticalSec class allows you to automatically protect part of your code by instantiating the class with a reference to a critical section instance.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)