Introduction
Recently, my brother asked me if there is an easy way to create a C++ class that facilitated object orientated threading. I have written many multi-threaded libraries in the past; however, they are all in C. C has always been my language of choice for low-level programming; I use C++ for GUI development. Although there are many excellent examples of object-oriented threading on CodeProject, none of the classes introduced suited all of my brother's needs and my curiosities. He wanted a thread class that had the following attributes:
- It supports both event driven and interval based asynchronous threading.
- It supports both homogeneous and specialized threading.
- It provides a FCFS (First Come First Serve) stack-queue for posting and handling multiple tasks.
- It is portable.
- It is simple to implement.
To support the new class, CThread
, other supporting classes were also developed. These include the CMutexClass
, CEventClass
, and CTask
classes. The CMutexClass
and CEventClass
provide resource management, while the CTask
class is a base class for deriving classes that support homogeneous asynchronous threading.
What is Threading?
Every process has at least one thread of control, and every process can perform at least one task at a time. A process that has more than one thread of control defines a multi-threaded process. A multi-threaded process allows multiple tasks to run asynchronously from within the environment of the process.
Resource Management—Thread Synchronization
Because threads within a multi-threaded process share the same resources, OS level control mechanisms are necessary to ensure data integrity. A loss of data integrity occurs when one thread is modifying a variable while another thread is attempting to read it, or two threads are attempting to modify the same variable at the same time. To prevent this scenario, the OS provides a Mutual Exclusion Object, known in short as a mutex. In multi-threaded applications, mutexes, deployed programmatically, prevent multiple threads from accessing a single resource at the same time. When a thread needs access to a resource, it must first acquire a mutex. Once a thread has acquired a mutex, other threads attempting to acquire the same mutex are blocked, and placed in a low-CPU usage wait state. Once a thread has completed data access, it releases the corresponding mutex; this allows other threads to acquire it and access the corresponding data.
Poor implementations of mutexes can result in resource starvation, also known as deadlock. Resource starvation occurs when one or more threads are competing for the same resource.
Example:
Thread A | Thread B |
---|
Acquires mutex(1) to modify data item 1 | Acquires mutex(2) to modify data item 2 |
Wants mutex(2) to view data item 2 | Wants mutex(1) to view data item 1 |
A deadlock occurs in the example above because thread A is blocked trying to acquire mutex(2), which is held by thread B. Thread B is blocked trying to acquire mutex(1), which is blocked by thread A.
Like mutexes, condition variables, in UNIX, are another form of synchronization mechanism. Condition variables allow threads to rendezvous. They allow one thread to notify another that a change has occurred. In Windows, these are events.
Operating System Calls
The following table is a list of the various functions used to implement threading in the CMutexClass
, CEventClass
, CTask
, and CThread
classes.
Function | OS | Description | Class Used in |
---|
CreateThread | Windows | Creates a Windows thread | CThread |
pthread_create | UNIX - POSIX THREADS | Creates a UNIX thread | CThread |
pthread_join | UNIX - POSIX THREADS | Waits for a UNIX thread to terminate | CThread |
pthread_attr_init | UNIX - POSIX THREADS | Sets a thread attribute structure to default | CThread |
pthread_attr_setstacksize | UNIX - POSIX THREADS | Sets the stack size value of the thread attribute structure | CThread |
WaitForSingleObject | Windows | Waits for an object to be signaled | CThread , CMutexClass , CEventClass |
CreateMutex | Windows | Creates a named or unnamed mutex | CMutexClass |
CloseHandle | Windows | Releases resources allocated to a Windows handle | CMutexClass , CEventClass , CThread |
ReleaseMutex | Windows | Releases a previously acquired mutex locked by WaitForSingleObject | CMutexClass , CEventClass |
pthread_mutexattr_init | UNIX - POSIX THREADS | Initializes a mutex attribute structure | CMutexClass , CEventClass |
pthread_mutex_init | UNIX - POSIX THREADS | Initializes a mutex using a provided attribute structure | CMutexClass , CEventClass |
pthread_mutex_lock | UNIX - POSIX THREADS | Locks a mutex | CMutexClass , CEventClass |
pthread_mutex_unlock | UNIX - POSIX THREADS | Unlocks a mutex previously locked by pthread_mutex_lock | CMutexClass , CEventClass |
pthread_mutex_destroy | UNIX - POSIX THREADS | Releases resources allocated to a mutex | CMutexClass , CEventClass |
CreateEvent | Windows | Creates a Windows event object | CEventClass |
SetEvent | Windows | Sets a Windows event object to signaled | CEventClass |
pthread_cond_signal | UNIX - POSIX THREADS | Unblocks a thread blocked on pthread_cond_wait | CEventClass |
pthread_cond_wait | UNIX - POSIX THREADS | Blocks on a condition variable | CEventClass |
pthread_cond_init | UNIX - POSIX THREADS | Initializes a condition variable | CEventClass |
The CMutexClass Class
The CMutexClass
class encapsulates the system level mutex functions and a mutex synchronization object. Mutex creation occurs during object instantiation, with the mutex created as unblocked. The class provides two member functions, Lock
and Unlock
. The Lock
member function locks a mutex, assigning it to the calling thread. The mutex remains locked to the calling thread until the calling thread releases it using the Unlock
member function. Threads that attempt to acquire a locked mutex by calling the Lock
member function are blocked, and placed into a low CPU consumption wait state until the blocking thread releases the mutex.
Member Functions
Function | Description |
---|
void CMutexClass() | Constructor |
void Lock() | Locks a mutex object, or waits if blocked |
void Unlock() | Unlocks/unblocks a previously blocked mutex |
Example
int g_iStorage = 0;
CMutexClass MyMutex;
void StoreValue( int *pInt )
{
MyMutex.Lock();
g_iStorage = *pInt;
MyMutex.Unlock();
}
The CEventClass Class
The CEventClass
class encapsulates the Windows event functions, a Windows event object, UNIX condition variable functions, and a UNIX condition variable. The functions incorporated into the CEventClass
class are SetEvent
and CreateEvent
under Windows, and phtread_cond_init
, pthread_cond_destroy
, pthread_cond_signal
, and pthread_cond_wait
under UNIX. Event synchronization objects are called condition variables under UNIX, but for the purpose of simplification, I will refer to both condition variables and event objects as event objects.
Member Functions
Function | Description |
---|
void Set() | Sets an event state to signaled, notifying the blocked thread. |
BOOL Wait() | Places the calling thread in a blocked state until the event state is set to signaled. Returns TRUE on success, FALSE on failure. |
void Reset() | Resets a signaled event to unsignaled. |
Example of an event object being used by a receiving thread:
CEventClass event;
.
.
.
.
while(bContinueRunning)
{
event.Wait();
.
.
event.Reset();
}
.
.
Example of an event object used by one thread signaling another:
CEventClass event;
.
.
.
.
event.Set();
.
.
The CTask Class and Non-Specialized Threads
In many of the thread programming examples that I have seen, data for thread processing is stored in a global variable, protected by a mutex. Instructions for operating on the data are integrated into the thread function. I define this form of threading as Specialized Asynchronous Threading (SAT). Ideally, the data and the corresponding functionality for processing the data should be encapsulated into the same object. I define this form of threading as Homogeneous Asynchronous Threading (HAT). Under HAT, threads are not specialized. For example, there would not be a printing thread and an I/O thread in a HAT solution. Instead, a single thread could perform both types of tasks because the tasks are implemented as complete objects; that is, they contain both the data and the functionality necessary to process the data. The CTask
class is a base class that facilitates HAT-based threading.
typedef enum {
TaskStatusNotSubmitted,
TaskStatusWaitingOnQueue,
TaskStatusBeingProcessed,
TaskStatusCompleted } TaskStatus_t;
class CTask
{
private:
CMutexClass m_mutex;
TaskStatus_t m_state;
ThreadId_t m_dwThread;
public:
void SetTaskStatus(TaskStatus_t state)
{
m_mutex.Lock();
m_state=state;
m_mutex.Unlock();
}
void SetId(ThreadId_t *pid)
{
memcpy(&m_dwThread,pid,sizeof(ThreadId_t));
}
BOOL Wait(int timeoutSeconds)
{
timeoutSeconds = timeoutSeconds * 1000;
if( Status() != TaskStatusCompleted &&
timeoutSeconds > 0 )
{
Sleep(100);
timeoutSeconds = timeoutSeconds - 100;
}
if( Status() == TaskStatusCompleted ) return TRUE;
return FALSE;
}
TaskStatus_t Status()
{
TaskStatus_t state ;
m_mutex.Lock();
state = m_state;
m_mutex.Unlock();
return state;
}
void Thread(ThreadId_t *pId)
{
memcpy(pId,&m_dwThread,sizeof(ThreadId_t));
}
CTask(){m_state=TaskStatusNotSubmitted;
memset(&m_dwThread,sizeof(ThreadId_t),0); }
~CTask(){}
virtual BOOL Task()=0;
};
Member Functions
Function | Description |
---|
m_mutex | Mutex object synchronization object. |
virtual BOOL Task() | Called by a CThread object to perform the task. |
TaskStatus_t Status() | Determines the task status: TaskStatusNotSubmitted , TaskStatusWaitingOnQueue , TaskStatusBeingProcessed , or TaskStatusCompleted . |
void Thread(ThreadId_t *pid) | Returns the thread ID of the processing thread. |
BOOL Wait(int iTimeInSeconds) | Places a calling thread into a wait state until the task completes or iTimeInSeconds elapses. If a task does not complete within iTimeInSeconds , FALSE is returned; otherwise, TRUE is returned. |
I have not defined the CThread
class; however, its definition is not necessary to understand how it interacts with a CTask
object. The list below presents an outline of how the two object types interact.
The procedure for processing a CTask object:
- A
CTask
object is passed to a CThread
object to be processed. - The
CThread
object places the CTask
object in a First Come First Served Queue. - The
CThread
object sets the CTask
object's state to TaskStatusWaitingOnQueue
. - The
CThread
object pops the CTask
object off of the wait queue. - The
CThread
object changes the CTask
object's state to TaskStatusBeingProcessed
. - The
CThread
object calls the CTask
object's member function "task" to perform the task. - The
CThread
object changes the CTask
object's state to TaskStateCompleted
.
The CThread Class, Putting It All Together
Member Functions
Function | Description |
---|
void CThread() | The constructor initializes object data and starts the thread. |
void ~CThread() | Terminates thread if it is running, and frees resources. |
BOOL Event(LPVOID lpvData) | Places a data block on the event stack/queue, and notifies the object's thread that data is waiting to be processed. |
BOOL Event(CTask *pTask) | Places a CTask object on the event stack/queue, and notifies the object's thread that a task is waiting to be performed. |
int GetEventsPending() | Returns the number of events waiting on the event stack. |
ThreadId_t GetId() | Returns the object's thread ID. |
DWORD GetErrorFlags() | Returns the object's error flags. If there are no errors, a value of 0 is returned (NO_ERRORS ). If there are errors, one or more of the following flags will be set: MUTEX_CREATION (a mutex object could not be created), EVENT_CREATION (an event object could not be created), THREAD_CREATION (the object's thread could not be created), ILLEGAL_USE_OF_EVENT (the Event member function was called for an interval based thread). |
BOOL PingThread(DWORD dwTimeoutMilli) | Determines whether the object's thread is running. Returns TRUE if the thread is running, FALSE if it is not. Timeout is in seconds. |
SetPriority(DWORD dwPriority) | Sets the thread's priority, Windows only. |
BOOL Start() | Starts the object's thread. |
BOOL Stop() | Stops the object's thread. |
void SetIdle(DWORD dwIdle) | Changes a thread's idle time in milliseconds, used with interval-based threading. |
SetThreadType(ThreadType_t typ,DWORD dwIdle) | Changes the thread type between ThreadTypeEventDriven and ThreadTypeIntervalDriven . |
m_mutex | A mutex object used for synchronization, see CMutexClass . |
ThreadState_t ThreadState() | Returns the state of a thread: ThreadStateBusy (the thread is processing an event), ThreadStateWaiting (the thread is waiting for a new event), ThreadStateDown (the thread is not running), ThreadStateShutingDown (the thread is in the process of shutting down). |
Now that you have learned the supporting classes, it's time to look at the main class, the CThread
class—the workhorse. The CThread
class supports two types of threads, Event Driven and Interval Driven. An Event Driven thread is a thread that remains in a wait state, blocked on an event object, until the event object's state changes from unsignaled to signaled. A new event occurs when a different thread places a task in a CThread
object's queue and notifies the object's thread by setting its event object to signaled. Once signaled, the thread wakes up and pops tasks from its event queue until the queue is empty.
The CThread
object invokes the OnTask
member function for each task. Tasks are processed in a First Come First Serve (FCFS) order. Hence, the first task placed in a CThread
object's queue is processed first, followed by the second, and so on. A mutex object synchronizes queue access, allowing additional events to be placed on the queue while the thread is processing older ones. Once the queue is empty, the thread resets the event object to unsignaled, and returns to waiting for an event object. The CThread
class supports two types of Event Driven threads: specialized and unspecialized threads, see CTask
.
To implement a specialized thread, a new class must be derived from the CThread
class. The derived class should contain a redefined implementation of OnTask
to process the object's data types.
Example
#include "Thread.h"
class CIncrementThread : public CThread
{
public:
int counter;
virtual BOOL OnTask( LPVOID lpv )
{
ThreadId_t id;
GetId(&id);
if( lpv )
{
int *pInt = (int *)lpv;
printf("\tthread(%ld, counter+%d=%d, counter incremented\n",
id,*pInt,(counter+=*pInt));
}
return TRUE;
}
virtual BOOL OnTask()
{
ThreadId_t id;
GetId(&id);
m_mutex.Lock();
printf("\tthread(%ld, counter++= %d, counter incremented)\n",
id,(++counter));
m_mutex.Unlock();
return TRUE;
}
int GetValue()
{
int counterValue = 0;
m_mutex.Lock();
counterValue = counter;
m_mutex.Unlock();
return counter;
}
void Reset()
{
m_mutex.Lock();
counter = 0;
m_mutex.Unlock();
}
CIncrementThread(){counter=0;}
~CIncrementThread(){}
};
int main( int argc,
char *argv[])
{
CIncrementThread MyThread;
int two=2;
while( MyThread.GetValue() < 20 )
{
MyThread.Event();
Sleep(100);
}
MyThread.Reset();
while( MyThread.GetValue() < 40 )
{
MyThread.Event(&two);
Sleep(100);
}
}
OUTPUT:
thread(5220, counter++= 1, counter incremented)
thread(5220, counter++= 2, counter incremented)
thread(5220, counter++= 3, counter incremented)
thread(5220, counter++= 4, counter incremented)
thread(5220, counter++= 5, counter incremented)
thread(5220, counter++= 6, counter incremented)
thread(5220, counter++= 7, counter incremented)
thread(5220, counter++= 8, counter incremented)
thread(5220, counter++= 9, counter incremented)
thread(5220, counter++= 10, counter incremented)
thread(5220, counter++= 11, counter incremented)
thread(5220, counter++= 12, counter incremented)
thread(5220, counter++= 13, counter incremented)
thread(5220, counter++= 14, counter incremented)
thread(5220, counter++= 15, counter incremented)
thread(5220, counter++= 16, counter incremented)
thread(5220, counter++= 17, counter incremented)
thread(5220, counter++= 18, counter incremented)
thread(5220, counter++= 19, counter incremented)
thread(5220, counter++= 20, counter incremented)
thread(5220, counter+2=2, counter incremented
thread(5220, counter+2=4, counter incremented
thread(5220, counter+2=6, counter incremented
thread(5220, counter+2=8, counter incremented
thread(5220, counter+2=10, counter incremented
thread(5220, counter+2=12, counter incremented
thread(5220, counter+2=14, counter incremented
thread(5220, counter+2=16, counter incremented
thread(5220, counter+2=18, counter incremented
thread(5220, counter+2=20, counter incremented
thread(5220, counter+2=22, counter incremented
thread(5220, counter+2=24, counter incremented
thread(5220, counter+2=26, counter incremented
thread(5220, counter+2=28, counter incremented
thread(5220, counter+2=30, counter incremented
thread(5220, counter+2=32, counter incremented
thread(5220, counter+2=34, counter incremented
thread(5220, counter+2=36, counter incremented
thread(5220, counter+2=38, counter incremented
thread(5220, counter+2=40, counter incremented
In the example above, I have derived a CIncrementThread
class from a CThread
class. In the class definition, I redefined both the OnTask()
and OnTask(LPVOID)
virtual member functions. In the OnTask()
implementation, I add one to the object's counter variable. The other OnTask
member function takes a pointer to an integer value, and adds the pointer's value to the counter member variable. This example illustrates the two types of events that a thread can process. Because the counter variable has the potential of being accessed by more than one thread, I use the CThread::m_mutex
object to ensure that it is only accessed by one thread.
HAT (Homogeneous Asynchronous Threading) threads are implemented using both the CThread
and the CTask classes.
Example
#include "Thread.h"
class CTaskIncrementer: public CTask
{
private:
int counter;
int incr;
public:
void SetIncr(int iValue)
{
m_mutex.Lock();
incr = iValue;
m_mutex.Unlock();
}
int GetIncrementValue()
{
int incrValue;
m_mutex.Lock();
incrValue=incr;
m_mutex.Unlock();
return incrValue;
}
int GetValue()
{
int counterValue = 0;
m_mutex.Lock();
counterValue = counter;
m_mutex.Unlock();
return counter;
}
BOOL Task()
{
ThreadId_t id;
Thread(&id);
m_mutex.Lock();
printf("\tthread(%ld, counter+%d=%d, counter incremented\n",
id,incr,(counter+=incr));
m_mutex.Unlock();
return TRUE;
}
CTaskIncrementer(){counter=0;}
~CTaskIncrementer(){}
};
int
main(int argc,
char *argv[])
{
CTaskIncrementer incr;
CThread thr;
incr.SetIncr(2);
while( incr.GetValue() < 40 ) thr.Event(&incr);
}
OUTPUT:
thread(5700, counter+2=2, counter incremented
thread(5700, counter+2=4, counter incremented
thread(5700, counter+2=6, counter incremented
thread(5700, counter+2=8, counter incremented
thread(5700, counter+2=10, counter incremented
thread(5700, counter+2=12, counter incremented
thread(5700, counter+2=14, counter incremented
thread(5700, counter+2=16, counter incremented
thread(5700, counter+2=18, counter incremented
thread(5700, counter+2=20, counter incremented
thread(5700, counter+2=22, counter incremented
thread(5700, counter+2=24, counter incremented
thread(5700, counter+2=26, counter incremented
thread(5700, counter+2=28, counter incremented
thread(5700, counter+2=30, counter incremented
thread(5700, counter+2=32, counter incremented
thread(5700, counter+2=34, counter incremented
thread(5700, counter+2=36, counter incremented
thread(5700, counter+2=38, counter incremented
thread(5700, counter+2=40, counter incremented
An Interval Driven thread is a thread that wakes up at predefined intervals, checks to see whether there is a change in the environment, processes the changes in the environment, sleeps for the next interval, and then wakes up and does it all over again. To implement an interval driven thread, you derive a CThread
class that redefines OnTask(LPVOID)
. Once the thread has been instantiated, you call the SetThreadType
member function with the parameter ThreadTypeIntervalDriven
and an interval in milliseconds.
Example
#include "Thread.h"
class CIncrementThread : public CThread
{
public:
int counter;
virtual BOOL OnTask()
{
ThreadId_t id;
GetId(&id);
m_mutex.Lock();
printf("\tthread(%ld, counter++= %d, counter incremented)\n",
id,(++counter));
m_mutex.Unlock();
return TRUE;
}
int GetValue()
{
int counterValue = 0;
m_mutex.Lock();
counterValue = counter;
m_mutex.Unlock();
return counter;
}
void Reset()
{
m_mutex.Lock();
counter = 0;
m_mutex.Unlock();
}
CIncrementThread(){counter=0;}
~CIncrementThread(){}
};
int
main( int argc,
char *argv[] )
{
CIncrementThread thr;
thr->SetThreadType(ThreadTypeIntervalDriven,100);
Sleep(500);
}
OUTPUT:
thread(6104, counter++= 12, counter incremented)
thread(6104, counter++= 13, counter incremented)
thread(6104, counter++= 14, counter incremented)
thread(6104, counter++= 15, counter incremented)
thread(6104, counter++= 16, counter incremented)
Conclusion
There you have it, a fully functional thread object. I have tested on Linux, and the classes work fine. I have yet to test on the SunOS or any of the other UNIX platforms that these classes should support. When compiling on Windows, be sure to specify /Mt or /Mtd for code generation; this identifies your application as a multi-threaded application. For Linux, the following make file works:
CC=g++
LIBS=-lpthread -lrt
CFLAGS=-DLINUX -DNANO_SECOND_SLEEP
OBJS=Thread.cpp EventClass.cpp MutexClass.cpp main.cpp
EXECS = thread
all: $(EXECS)
thread: $(OBJS)
$(CC) $(CFLAGS) -o thread $(OBJS) $(LIBS)
clean:; rm -f *.o $(EXECS)
History
- (31 Oct 2007)
- Added support for
beginthreadex
. To compile to use beginthreadex
, define USE_BEGIN_THREAD
. The default is to use CreateThread
for thread creation. The attached project defines USE_BEGIN_THREAD
in the preprocessor section. - I forgot
CreateThread
is prone to leaks if used with the C runtime functions.
- (1 Nov 2007)
Made a lot of improvements:
- I introduced more logic and
ASSERTION
s to ensure the integrity of CThread
objects. Both the Homogeneous and Specialized thread types can be physically set using the SetThreadType
member function. If the thread type is not set, the thread will determine its type based on calls to member functions; however, this does not apply to interval-based threads. Interval-based threads must be implicitly identified using the SetThreadType
member function. The new integrity tests are implemented to ensure usage consistency with a CThread
object. - New member functions
AtCapacity
and PercentCapacity
were added to determine if a thread is truly busy. AtCapacity
will return TRUE
under one of two conditions: the thread is processing an event and its queue is full, or the thread is not running. PercentCapacity
returns the percentage full for an object's queue. These new functions allow thread objects to be placed in arrays and tasked based on their workloads. - New member function
SetQueueSize
allows a CThread
object's queue to be resized. The function will return false
if the number of elements pending on a queue is greater than the requested queue size. - The
Event
member function has been modified to verify that a thread is running before posting an event. This eliminates the need to call PingThread
when passing an event for the first time. - Error flags are automatically reset when certain member functions are called; this isolates error occurrences to specific call sequences.
- (2 Nov 2007)
- Removed unnecessary code, for UNIX platforms, in the
CEventClass
class per On Freund's comments, see message below.
- (2 Nov 2007)
- Added
try/catch
blocks to ensure proper use of all classes per On's message.
- (8 Nov 2007)
This is the last installment of changes for this article. I am moving onto my next article. Since it will be using these classes, additional fixes will be posted there.
- I forgot to remove a couple sequential calls to
m_event.Reset
that were meant for testing the CEventClass
' try-catch
block. This will only affect non-Windows platforms. Also, relocated m_event.Reset
to immediately after m_event.Wait
. Since we are dealing with a queue, this could cause a bottleneck. I commented out the old code, just in case someone prefers it the old way. - The example, main.cpp, contained a couple sequential calls to
m_mutex.lock
which would throw an exception; these were meant for testing the CMutexClass
' try-catch
block. They have been removed.