std::thread is available to spin off a thread but there is no thread-safe queue and no timers – services that most OS’s provide. In this article I’ll show how to use the C++ Standard Library to create these “missing” features and provide an event processing loop familiar to many programmers.
Introduction
An event loop, or sometimes called a message loop, is a thread that waits for and dispatches incoming events. The thread blocks waiting for requests to arrive and then dispatches the event to an event handler function. A message queue is typically used by the loop to hold incoming messages. Each message is sequentially dequeued, decoded, and then an action is performed. Event loops are one way to implement inter-process communication.
All operating systems provide support for multi-threaded applications. Each OS has unique function calls for creating threads, message queues and timers. With the advent of the C++11 thread support library, it’s now possible to create portable code and avoid the OS-specific function calls. This article provides a simple example of how to create a thread event loop, message queue and timer services while only relying upon the C++ Standard Library. Any C++11 compiler supporting the thread library should be able to compile the attached source.
CMake is used to create the build files. CMake is free and open-source software. Windows, Linux and other toolchains are supported. See the CMakeLists.txt file for more information.
See GitHub for latest source code:
Background
Typically, I need a thread to operate as an event loop. Incoming messages are dequeued by the thread and data is dispatched to an appropriate function handler based on a unique message identifier. Timer support capable of invoking a function is handy for low speed polling or to generate a timeout if something doesn’t happen in the expected amount of time. Many times, the worker thread is created at startup and isn’t destroyed until the application terminates.
A key requirement for the implementation is that the incoming messages must execute on the same thread instance. Whereas say std::async
may use a temporary thread from a pool, this class ensures that all incoming messages use the same thread. For instance, a subsystem could be implemented with code that is not thread-safe. A single WorkerThread
instance is used to safely dispatch function calls into the subsystem.
At first glance, the C++ thread support seems to be missing some key features. Yes, std::thread
is available to spin off a thread but there is no thread-safe queue and no timers – services that most OS’s provide. I’ll show how to use the C++ Standard Library to create these “missing” features and provide an event processing loop familiar to many programmers.
WorkerThread
The WorkerThread
class encapsulates all the necessary event loop mechanisms. A simple class interface allows thread creation, posting messages to the event loop, and eventual thread termination. The interface is shown below:
class WorkerThread
{
public:
WorkerThread(const char* threadName);
~WorkerThread();
bool CreateThread();
void ExitThread();
std::thread::id GetThreadId();
static std::thread::id GetCurrentThreadId();
void PostMsg(std::shared_ptr<UserData> msg);
private:
WorkerThread(const WorkerThread&) = delete;
WorkerThread& operator=(const WorkerThread&) = delete;
void Process();
void TimerThread();
std::unique_ptr<std::thread> m_thread;
std::queue<std::shared_ptr<ThreadMsg>> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
std::atomic<bool> m_timerExit;
const char* THREAD_NAME;
};
The first thing to notice is that std::thread
is used to create a main worker thread. The main worker thread function is Process()
.
bool WorkerThread::CreateThread()
{
if (!m_thread)
m_thread = new thread(&WorkerThread::Process, this);
return true;
}
Event Loop
The Process()
event loop is shown below. The thread relies upon a std::queue<ThreadMsg*>
for the message queue. std::queue
is not thread-safe so all access to the queue must be protected by mutex. A std::condition_variable
is used to suspend the thread until notified that a new message has been added to the queue.
void WorkerThread::Process()
{
m_timerExit = false;
std::thread timerThread(&WorkerThread::TimerThread, this);
while (1)
{
std::shared_ptr<ThreadMsg> msg;
{
std::unique_lock<std::mutex> lk(m_mutex);
while (m_queue.empty())
m_cv.wait(lk);
if (m_queue.empty())
continue;
msg = m_queue.front();
m_queue.pop();
}
switch (msg->id)
{
case MSG_POST_USER_DATA:
{
ASSERT_TRUE(msg->msg != NULL);
auto userData = std::static_pointer_cast<UserData>(msg->msg);
cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
break;
}
case MSG_TIMER:
cout << "Timer expired on " << THREAD_NAME << endl;
break;
case MSG_EXIT_THREAD:
{
m_timerExit = true;
timerThread.join();
return;
}
default:
ASSERT();
}
}
}
PostMsg()
creates a new ThreadMsg
on the heap, adds the message to the queue, and then notifies the worker thread using a condition variable.
void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
{
ASSERT_TRUE(m_thread);
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
The loop will continue to process messages until the MSG_EXIT_THREAD
is received and the thread exits.
void WorkerThread::ExitThread()
{
if (!m_thread)
return;
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));
{
lock_guard<mutex> lock(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
m_thread->join();
m_thread = nullptr;
}
Event Loop (Win32)
The code snippet below contrasts the std::thread
event loop above with a similar Win32 version using the Windows API. Notice GetMessage()
API is used in lieu of the std::queue
. Messages are posted to the OS message queue using PostThreadMessage()
. And finally, timerSetEvent()
is used to place WM_USER_TIMER
messages into the queue. All of these services are provided by the OS. The std::thread WorkerThread
implementation presented here avoids the raw OS calls yet the implementation functionality is the same as the Win32 version while relying only upon only the C++ Standard Library.
unsigned long WorkerThread::Process(void* parameter)
{
MSG msg;
BOOL bRet;
MMRESULT timerId = timeSetEvent(250, 10, &WorkerThread::TimerExpired,
reinterpret_cast<DWORD>(this), TIME_PERIODIC);
while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
{
switch (msg.message)
{
case WM_DISPATCH_DELEGATE:
{
ASSERT_TRUE(msg.wParam != NULL);
const UserData* userData = static_cast<const UserData*>(msg.wParam);
cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
delete userData;
break;
}
case WM_USER_TIMER:
cout << "Timer expired on " << THREAD_NAME << endl;
break;
case WM_EXIT_THREAD:
timeKillEvent(timerId);
return 0;
default:
ASSERT();
}
}
return 0;
}
Timer
A low-resolution periodic timer message is inserted into the queue using a secondary private thread. The timer thread is created inside Process()
.
void WorkerThread::Process()
{
m_timerExit = false;
std::thread timerThread(&WorkerThread::TimerThread, this);
...
The timer thread’s sole responsibility is to insert a MSG_TIMER
message every 250ms. In this implementation, there’s no protection against the timer thread injecting more than one timer message into the queue. This could happen if the worker thread falls behind and can’t service the message queue fast enough. Depending on the worker thread, processing load, and how fast the timer messages are inserted, additional logic could be employed to prevent flooding the queue.
void WorkerThread::TimerThread()
{
while (!m_timerExit)
{
std::this_thread::sleep_for(250ms);
std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
}
Usage
The main()
function below shows how to use the WorkerThread
class. Two worker threads are created and a message is posted to each one. After a short delay, both threads exit.
WorkerThread workerThread1("WorkerThread1");
WorkerThread workerThread2("WorkerThread2");
int main(void)
{
workerThread1.CreateThread();
workerThread2.CreateThread();
std::shared_ptr<UserData> userData1(new UserData());
userData1->msg = "Hello world";
userData1->year = 2017;
workerThread1.PostMsg(userData1);
std::shared_ptr<UserData> userData2(new UserData());
userData2->msg = "Goodbye world";
userData2->year = 2017;
workerThread2.PostMsg(userData2);
this_thread::sleep_for(1s);
workerThread1.ExitThread();
workerThread2.ExitThread();
return 0;
}
Conclusion
The C++ thread support library offers a platform independent way to write multi-threaded application code without reliance upon OS-specific API’s. The WorkerThread
class presented here is a bare-bones implementation of an event loop, yet all the basics are there ready to be expanded upon.
History
- 5th February, 2017
- 7th February, 2017
- Updated article to provide clarifications on an event loop and contrast the implementation with a Win32 event loop
- 13th September, 2020
- Minor moderization updates to simplify the implementation. New source code and article updates.