Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

A class to synchronise thread completions

0.00/5 (No votes)
16 Oct 2004 1  
Synchronising thread completion the easy way

Introduction

It can be very useful to break a complex task into a series of small sub-tasks; as software developers we do this all the time. Some sub-tasks must be performed in sequence whilst other sub-tasks can be processed in parallel. And sometimes there are tasks that can be performed in parallel in the lead up to some other task that cannot start until all the sub-tasks have been completed.

If this sounds like multithreading you'd be right. It can be tempting, especially if you have a multi-processor machine, to launch two or more background threads to perform particular tasks. But then comes the crunch! Because each thread runs independently of the main application thread it becomes necessary to synchronise the threads to ensure that all required sub-tasks have been completed before moving on to the next. This usually involves a bunch of code to track thread handles and a call to WaitForMultipleObjects() to wait until all threads have completed. The code itself is simple enough but it can become tedious writing it every time you need it. Hence these classes.

Basic threading API's

Before we look at the classes let's briefly review the basic threading API, _beginthreadex(), used by the classes. This API lets us launch a thread specifying the address of the thread procedure, a void pointer which can be used to pass data to the thread (typically a pointer to a struct containing the data the thread needs as defined by you the programmer), the stack size for the thread, and whether the thread should run immediately or if it should be suspended until sometime later. If suspended, it won't run at all until we give it the go ahead.

There are two other parameters I'm not particularly interested in, one which lets us specify whether the thread handle can be inherited by a child process, and another that optionally points to a 32 bit location that will recieve the thread identifer.   See MSDN for the details of those parameters.

What we get back from _beginthreadex() is a HANDLE we can later use to detect if and when the thread has terminated. It's an open handle so at some time in the future the handle needs to be closed. (Parenthetically, a very common error I've seen is programs that don't close thread handles when the thread they represent has terminated. You can detect this by opening Task Manager and adding the threads and handles columns to the processes display. As threads are created and terminate you see the handle count rise and keep on rising. Handles are a finite resource even on Windows NT and derived operating systems and if your application runs long enough without cleaning up its thread handles it will eventually bring the system to its knees).

We can detect if and when the thread has terminated by calling WaitForSingleObject() passing the handle returned by _beginthreadex. If the thread has terminated WaitForSingleObject() will return immediately with a return value of WAIT_OBJECT_0. If the thread is still running then WaitForSingleObject() waits until the thread terminates or until a timeout we get to specify in the call to WaitForSingleObject() has elapsed. If the timeout elapses WaitForSingleObject() returns WAIT_TIMEOUT. Simple stuff! It looks like this in code.

HANDLE hHandle = _beginthreadex(NULL, 0, MyTheadProc, NULL, 0, NULL);

switch (WaitForSingleObject(hHandle, 50000))
{
case WAIT_OBJECT_0:
    //  The thread has terminated - do something

    break;
    
case WAIT_TIMEOUT:
    //  The timeout has elapsed but the thread is still running

    //  do something appropriate for a timeout

    break;
}

CloseHandle(hHandle);
all of which is pretty straightforward. The 50000 is the timeout expressed in milliseconds. On the other hand, remember that you have to track the thread handle, make sure you close it and write a switch statement to handle the two cases.

It gets more complex if you have more than one thread to track. In this case you need to use WaitForMultipleObjects() passing it an array of thread handles. The return value from WaitForMultipleObjects() will be either WAIT_TIMEOUT, some error code or a value between WAIT_OBJECT_0 and WAIT_OBJECT_0 + number of handles being waited upon - 1. The only error I've ever encountered in this situation is an invalid handle error code which indicates that one of the handles I was waiting on had been closed. Of course the error code doesn't indicate which handle has been closed!

How could the handle have been closed? Either by the programmer (me) explicitly closing the handle prematurely, or by calling _beginthread(), which is an initially attractive thread API inasmuch as it involves rather less typing. You call _beginthread() specifying the thread procedure address, the stacksize and the aforementioned void pointer to data the thread needs to know about. After a call to _beginthread() the thread is running (no suspension is possible). The gotcha is that _beginthread() closes the thread handle before it returns to your code - which means that you cannot wait on the handle. If you try then either of the wait functions (WaitForSingleObject() or WaitForMultipleObjects() will immediately return with the invalid error code!

Finally, why use _beginthreadex() instead of CreateThread()? Quite simply, because _beginthreadex() manages thread local storage for us, both on creating the thread and later on, when ending the thread.

With that basic review of threading API's out of the way let's look at the classes.

CBaseThread

This class simply encapsulates a single thread. Later classes use CBaseThread to synchronously or asynchronously wait until a given set of threads have terminated. The class is a thin wrapper around the _beginthreadex() function. It takes care of the details of tracking the thread handle returned by _beginthreadex(), ensuring the handle is eventually closed, and provides some member functions to access the thread handle and wait on it. The class looks like this:

class CBaseThread : public CObject
{
    DECLARE_DYNAMIC(CBaseThread);
public:
                CBaseThread(HANDLE hStopEvent, volatile bool *bStop, 
                            unsigned(__stdcall *thread)(void *),
                            bool bWait = false, LPVOID data = NULL);
                ~CBaseThread();

    bool        IsWaiting() const       { return m_bWaiting; }
    volatile bool Stop() const          { return *m_bStop; }
    HANDLE      StopEvent() const       { return m_hStopEvent; }
    HANDLE      ThreadHandle() const    { return m_hThreadHandle; }
    LPVOID      UserData() const        { return m_pvUserData; }

    virtual bool Wait(DWORD dwTimeout = INFINITE) const;
    bool        Run() const             
                { return ResumeThread(m_hThreadHandle) == 1; }

    UINT        ThreadID() const        { return m_uiThreadID; }

private:
    LPVOID      m_pvUserData;
    HANDLE      m_hStopEvent,
                m_hThreadHandle;
    volatile bool *m_bStop,
                m_bWaiting;
    UINT        m_uiThreadID;
};
In the constructor the hStopEvent handle and the bStop variable are used to provide a way for the outside world to signal the thread that it ought to stop running. I'll discuss these a bit later in the article. The third parameter is the address of the thread procedure. The fourth parameter, the bWait parameter, defaults to false, indicating that the thread should run as soon as it's created. If bWait is true the thread is created but suspended and it won't run until the Run() or the Wait() methods are called. The data parameter is the aforementioned void pointer to data the thread needs. The class places no interpretation on this parameter.

Run() lets the thread run (if it had been created suspended). Wait() runs the thread if it had been created suspended and then waits until either the thread has terminated or until the timeout expires.

The destructor closes the thread handle.

There is a new class (added October 16th 2004) called CUserThread which is described at the end of the article.

CSyncRendevouz

is a class that encapsulates the work of creating multiple threads, tracking their thread handles and ensuring all threads have completed before allowing the next step in processing to commence. This class performs synchronously. In other words, once you've used the class to launch a set of threads and called the Wait() method, execution in the thread that called Wait() stops until all the threads launched via the class instance have terminated.

The class looks like this:

class CSyncRendevouz : public CObject
{
    DECLARE_DYNAMIC(CSyncRendevouz);
public:
                CSyncRendevouz(void);
                ~CSyncRendevouz(void);

    void        Stop()                  
                { m_bStop = TRUE; SetEvent(m_hStopEvent); }
    virtual bool Wait(DWORD dwTimeout = INFINITE);

    void        AddThread(unsigned(__stdcall *thread)(void *), 
                          bool bWait = false, 
                          LPVOID data = NULL);
    bool        AddHandle(HANDLE hHandle);

protected:
    CArray<HANDLE, HANDLE> m_handleArray;
    CList<CBaseThread*, CBaseThread *> m_threads;
    HANDLE      m_hStopEvent;
    volatile bool m_bStop;
};
and usage might look like this:
CSyncRendevouz rendevouz;

rendevouz.AddThread(Thread1);
rendevouz.AddThread(Thread2);

rendevouz.Wait(50000);
which creates a CSyncRendevouz object, adds Thread1 and Thread2 to the object and then calls Wait(). In the example it waits up to 50 seconds for the threads to execute. If both threads terminate before the timeout has elapsed the Wait() call returns true, otherwise it returns false.

I said above that the code snippet adds Thread1 and Thread2 to the object. A close examination of the function prototype should reveal that what's actually passed to the AddThread() method is the address of the thread procedure, implying that the AddThread() call is where the thread is created. This is exactly what happens, via the creation of one CBaseThread object per thread. Let's look at the code for AddThread().

bool CSyncRendevouz::AddThread(unsigned(__stdcall *thread)(void *), 
                               volatile bool bWait, 
                               LPVOID data)
{
    if (m_handleArray.GetCount() > MAXIMUM_WAIT_OBJECTS - 1)
        return false;

    ASSERT(thread);

    CBaseThread *pThread = new CBaseThread(m_hStopEvent, &m_bStop, thread, 
                                           bWait, 
                                           data);

    ASSERT(pThread);
    ASSERT_KINDOF(CBaseThread, pThread);

    m_threads.AddTail(pThread);
    m_handleArray.Add(pThread->ThreadHandle());
    return true;
}

This first checks how many handles are being monitored by this instance of CSyncRendevouz, returning false if we've reached the limit which is currently 64. (The limit is set by Windows). If we're not yet at the limit the function creates a new CBaseThread object passing it a combination of user data, user parameters and a couple of objects that exist as part of the CSyncRendevouz object. The newly created CBaseThread object saves the handles and pointers and launches a new thread, passing its own address as the thread data. We then add the new object to a list of objects (for later deletion) and add the thread handle to an array.

When all the threads have been created it's time to monitor them. Remember from our previous discussion that the threads may be created suspended, or may be already running by the time we're ready to call the Wait() method. The Wait() method must, therefore, traverse the list of thread objects and allow all suspended threads to run first. This is accomplished by this code.

bool CSyncRendevouz::Wait(DWORD dwTimeout)
{
    CBaseThread *pThread;
    POSITION    pos = m_threads.GetHeadPosition();

    while (pos != POSITION(NULL))
    {
        pThread = m_threads.GetNext(pos);

        ASSERT(pThread);
        ASSERT_KINDOF(CBaseThread, pThread);

        if (pThread->IsWaiting())
            pThread->Run();
    }

    return WaitForMultipleObjects(m_handleArray.GetCount(),
                                  m_handleArray.GetData(), 
                                  TRUE, dwTimeout) != WAIT_TIMEOUT;
}

Notice that there's also an AddHandle() member which lets you add any waitable handle to the wait list.

CAsyncRendevouz

This class is derived from CSyncRendevouz. When you call the Wait() method on this class it creates another thread which performs the Wait() function  and returns immediately.  When the wait has terminated (either by all threads terminating or by the timeout elapsing), the worker thread sends a message to a window. The message and the target window handle are both specified when creating the CAsyncRendevouz object. This class looks like this:

class CAsyncRendevouz : public CSyncRendevouz
{
    DECLARE_DYNAMIC(CAsyncRendevouz);
public:
                CAsyncRendevouz(HWND wndTarget, UINT uiMsg, 
                                LPVOID pvUserData = NULL);
                ~CAsyncRendevouz();

    virtual bool Wait(DWORD dwTimeout = INFINITE);

private:
    static unsigned __stdcall WaitProc(LPVOID data);

    HWND        m_wndTarget;
    UINT        m_uiMsg;
    DWORD       m_dwTimeout;
    LPVOID      m_pvUserData;
    CBaseThread *m_pThread;
};

This looks pretty straightforward except, perhaps, for that LPVOID parameter in the constructor. This is not to be confused with the parameter of the same name and type in the CSyncRendevouz::AddThread() method. The parameter in the CSyncRendevouz::AddThread() call is user data that's passed to the thread procedure. The parameter in the CAsyncRendevouz constructor is data that's passed as the LPARAM data in the windows message posted to the specified window.

CAsyncRendevouz::Wait() looks like this.

bool CAsyncRendevouz::Wait(DWORD dwTimeout)
{
    m_dwTimeout = dwTimeout;
    m_pThread = new CBaseThread(m_hStopEvent, &m_bStop, WaitProc, 0, 
                                LPVOID(this));
    return TRUE;
}
which creates a CBaseThread object passing the CAsyneRendevouz object as the user data. The thread procedure looks like this.
unsigned __stdcall CAsyncRendevouz::WaitProc(LPVOID data)
{
    {
        DEREF(data);

        CAsyncRendevouz *pThis = (CAsyncRendevouz *) pThread->UserData();

        ASSERT(pThis);
        ASSERT_KINDOF(CAsyncRendevouz, pThis);

        bool bResult = pThis->CSyncRendevouz::Wait(pThis->m_dwTimeout);

        if (IsWindow(pThis->m_wndTarget))
            ::PostMessage(pThis->m_wndTarget, pThis->m_uiMsg, 
                          WPARAM(bResult), LPARAM(pThis->m_pvUserData));
    }

    _endthreadex(0);

    //  Not reached

    return 0;
}
which has a couple of things that are not immediately obvious. The first is the extra pair of braces. Strictly speaking they are not necessary in this procedure but I've learned to always use them. Why? Because I'm terminating the thread with a call to _endthreadex(). Go look at the MSDN documentation on the function. No wiser? I'm not surprised. What they don't tell you in that documentation is that calling _endthreadex() instantly terminates the thread and doesn't run destructors for any objects created at the same scope as the call to _endthreadex((). I found this out the hard way. So I always enclose the working code of a thread with a pair of braces and ensure the _endthreadex() call is outside the scope of the working code.

CAsyncRendevouz::Wait() calls CSyncRendevouz::Wait() and posts the user defined message value to the designated window once the wait call exits. The exit status (timeout or all threads terminated) is passed as the WPARAM value and the user data is passed as the LPARAM value.

The other non-obvious thing is the DEREF macro. It's defined thusly.

#define DEREF(data) \
    rendevouz::CBaseThread *pThread = (rendevouz::CBaseThread *) data; \
    ASSERT(pThread); \
    ASSERT_KINDOF(CBaseThread, pThread);

All the macro does is define a variable called pThread which is a pointer to a CBaseThread object. Remember that I said earlier that CBaseThread creates the thread and passes it's own address as the thread data? You're about to find out why.

Controlling a thread

There comes a time when it's necessary to terminate a thread before it's finished it's normal execution. Perhaps it's a thread processing image data that takes minutes to process. Or perhaps it's a thread monitoring a remote connection that sends data once per hour. Either way, you've followed good practice and thrown up a dialog box showing progress and a cancel button. The user clicks the cancel button. How to terminate the thread?

A very bad way is to use the TerminateThread() API.

Here's some of the MSDN documentation on TerminateThread.

TerminateThread is used to cause a thread to exit. When this occurs, the target thread has no chance to execute any user-mode code and its initial stack is not deallocated. DLLs attached to the thread are not notified that the thread is terminating.

TerminateThread is a dangerous function that should only be used in the most extreme cases. You should call TerminateThread only if you know exactly what the target thread is doing, and you control all of the code that the target thread could possibly be running at the time of the termination. For example, TerminateThread can result in the following problems:

  • If the target thread owns a critical section, the critical section will not be released.

  • If the target thread is allocating memory from the heap, the heap lock will not be released.

  • If the target thread is executing certain kernel32 calls when it is terminated, the kernel32 state for the thread's process could be inconsistent.

  • If the target thread is manipulating the global state of a shared DLL, the state of the DLL could be destroyed, affecting other users of the DLL.

Sounds pretty bad. In fact it's so bad that I never use TerminateThread(). Frankly, I'd rather throw up a panic error messagebox and terminate the entire process. Just as one example, if you happen to terminate a thread when it's aquired a lock on the programs global heap all subsequent memory allocation attempts will block waiting for a thread that's no longer running to release the lock. It ain't running so it's never going to release that lock!

Thus the classes include support for the termination of threads. Hands up everyone who remember Windows 3.0 and cooperative multitasking? Hands down. We're about to re-enter that world.

The only way to make a thread safely terminable is to make it aware of the outside world and to make it either poll an external variable or to include an external event object in the list of objects it's waiting on. Many threads don't need to know about the outside world. They do some work, wait a short time if necessary, timeout and exit. But other threads may be performing work that might take minutes or hours. Even if you don't give the user the option to directly terminate work in progress you still have to allow for the possibility that the user may want to terminate the entire program. The program, however, won't terminate if one of it's threads fails to terminate. Even if the user interface is torn down the program is still running and still shows in Task Manager.

In One use for Overlapped IO[^] I explained how to use an event handle to signal a thread from the outside world that it was time to terminate. That's the purpose of the event handle that's passed from CSyncRendevouz into CBaseThread! Your thread should execute a WaitForMultipleObjects waiting on handles it knows about as part of the task it's performing plus the handle returned from a call to CBaseThread::StopEvent(). When the handle from the CBaseThread::StopEvent() is signalled it's time to cleanup and exit.  Example code in your thread might look like this:

unsigned __stdcall MyThreadProc(LPVOID data)
{
    {
        DEREF(data);

        //  Create some handle we're going to wait on as part

        //  of normal processing

        HANDLE hFile = CreateFile(PARAMETERS);
        HANDLE hWaitArray[2];
        bool   bExit = false;
        
        hWaitArray[0] = hFile;
        hWaitArray[1] = pThread->StopHandle();
        
        while (bExit == false)
        {
            //  Wait on two handles. The first handle is our file handle,

            //  the second handle is the stop handle which is part of 

            //  the CBaseThread object controlling us.

            switch (WaitForMultipleObjects(2, hWaitArray, FALSE, dwTimeout))
            {
            case WAIT_TIMEOUT:
                //  Do something relevant

                break
                
            case WAIT_OBJECT_0:
                //  Something happened on the file so do something relevant;

                break;
                
            case WAIT_OBJECT_1:
                //  The stop event was signalled, it's time to exit.

                CloseHandle(hFile);
                bExit = true;
                break;
            }
        }
    }

    _endthreadex(0);

    //  Not reached

    return 0;
}
On the other hand, it's entirely possible that your thread never waits on some external event but, instead, is executing a loop. In that case it should periodically check if it should terminate by calling CBaseThread::Stop(). If a call to CBaseThread::Stop() returns true it's time to cleanup and exit! Example code might look like this:
unsigned __stdcall MyThreadProc(LPVOID data)
{
    {
        DEREF(data);

        while (pThread->Stop() == false)
        {
            //  Do some work..

        }
    }

    _endthreadex(0);

    //  Not reached

    return 0;
}
Of course, all of this works only if you write your threads to actually use the stop mechanisms. It also only works if code elsewhere in your program has a chance to call the CSyncRendevouz::Stop() method. It would be a mistake to create the CSyncRendevouz object in your main thread and then call the Wait() method from that thread with the expectation that you could call the Stop() method. Remember that once a thread calls Wait() that thread stops running until all the threads controlled by the CSyncRendevouz object have terminated! The demo project doesn't have this problem (the demo project uses threads that sleep for a maximum of 5 seconds) but real world use of the classes will. Thus most usage of the classes will be via CAsyncRendevouz where an extra thread is created within the object to perform the wait. The main thread continues to execute and can call the Stop() method at will. The main thread would then create a bunch of threads (via the CSyncRendevouz::AddThread()) method to perform work and wait for the message that all threads have terminated before it schedules the next piece of work.

CUserThread

This is a class I've added for version 2 of the rendevouz library. It doesn't actually have anything to do with thread synchronisation; I added the class because I found myself using CBaseThread in other projects (and other articles) and the original design of the class was somewhat awkward when one didn't have related threads. The awkwardness arises from the need to pass a stop handle and a pointer to a bool variable to the constructor. If you're controlling multiple threads it makes sense to share the stop handle et al but if you're using CBaseThread to control just one thread it becomes a bit of a pain to have to declare those variables within every consumer class. On the other hand, I've found the class makes it so much easier to control threads that I don't want to stop using it.  So after about the fifteenth new project using CBaseThread where I found myself adding the stop handle and bool I found myself thinking there must be an easier way. The class is extremely simple and looks like this:
class CUserThread : public CBaseThread
{
    DECLARE_DYNAMIC(CUserThread);
public:
                CUserThread(unsigned(__stdcall *thread)(void *), 
bool bWait = false, LPVOID data = NULL); virtual ~CUserThread(); void TerminateThread(); private: volatile bool m_bStopVar; };
The constructor omits the stop handle and the bool variable in the parameter list but makes sure they're set up correctly for CBaseThread thusly;
CUserThread::CUserThread(unsigned(__stdcall *thread)(void *), 
bool bWait, LPVOID data) : CBaseThread(NULL, &m_bStopVar, thread, bWait, data) { m_bStopVar = false; m_hStopEvent = CreateEvent(NULL, FALSE, FALSE, NULL); }
The destructor ensures the class cleans up after itself and the TerminateThread() method sets both the event handle and the bool variable to ensure the thread controlled by the object is signalled to stop.

History

  • July 16 2004 - Initial version.
  • July 17 2004 - Changed the wait thread in the CAsyncRendevouz class to pass the result of the wait (timeout or all threads terminated) as the WPARAM member of the windows message.
  • October 16 2004 - Added the CUserThread class.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here