Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++11

C++ std::thread Event Loop with Message Queue and Timer

4.78/5 (41 votes)
13 Sep 2020CPOL4 min read 191.2K   4.4K  
Create a worker thread with an event loop, message queue and a timer using the C++11 thread support library
std::thread is available to spin off a thread but there is no thread-safe queue and no timers – services that most OS’s provide. In this article I’ll show how to use the C++ Standard Library to create these “missing” features and provide an event processing loop familiar to many programmers.

Introduction

An event loop, or sometimes called a message loop, is a thread that waits for and dispatches incoming events. The thread blocks waiting for requests to arrive and then dispatches the event to an event handler function. A message queue is typically used by the loop to hold incoming messages. Each message is sequentially dequeued, decoded, and then an action is performed. Event loops are one way to implement inter-process communication.

All operating systems provide support for multi-threaded applications. Each OS has unique function calls for creating threads, message queues and timers. With the advent of the C++11 thread support library, it’s now possible to create portable code and avoid the OS-specific function calls. This article provides a simple example of how to create a thread event loop, message queue and timer services while only relying upon the C++ Standard Library. Any C++11 compiler supporting the thread library should be able to compile the attached source.

CMake is used to create the build files. CMake is free and open-source software. Windows, Linux and other toolchains are supported. See the CMakeLists.txt file for more information.

See GitHub for latest source code:

Background

Typically, I need a thread to operate as an event loop. Incoming messages are dequeued by the thread and data is dispatched to an appropriate function handler based on a unique message identifier. Timer support capable of invoking a function is handy for low speed polling or to generate a timeout if something doesn’t happen in the expected amount of time. Many times, the worker thread is created at startup and isn’t destroyed until the application terminates.

A key requirement for the implementation is that the incoming messages must execute on the same thread instance. Whereas say std::async may use a temporary thread from a pool, this class ensures that all incoming messages use the same thread. For instance, a subsystem could be implemented with code that is not thread-safe. A single WorkerThread instance is used to safely dispatch function calls into the subsystem.

At first glance, the C++ thread support seems to be missing some key features. Yes, std::thread is available to spin off a thread but there is no thread-safe queue and no timers – services that most OS’s provide. I’ll show how to use the C++ Standard Library to create these “missing” features and provide an event processing loop familiar to many programmers.

WorkerThread

The WorkerThread class encapsulates all the necessary event loop mechanisms. A simple class interface allows thread creation, posting messages to the event loop, and eventual thread termination. The interface is shown below:

C++
class WorkerThread
{
public:
    /// Constructor
    WorkerThread(const char* threadName);

    /// Destructor
    ~WorkerThread();

    /// Called once to create the worker thread
    /// @return True if thread is created. False otherwise. 
    bool CreateThread();

    /// Called once a program exit to exit the worker thread
    void ExitThread();

    /// Get the ID of this thread instance
    /// @return The worker thread ID
    std::thread::id GetThreadId();

    /// Get the ID of the currently executing thread
    /// @return The current thread ID
    static std::thread::id GetCurrentThreadId();

    /// Add a message to the thread queue
    /// @param[in] data - thread specific message information
    void PostMsg(std::shared_ptr<UserData> msg);

private:
    WorkerThread(const WorkerThread&) = delete;
    WorkerThread& operator=(const WorkerThread&) = delete;

    /// Entry point for the worker thread
    void Process();

    /// Entry point for timer thread
    void TimerThread();

    std::unique_ptr<std::thread> m_thread;
    std::queue<std::shared_ptr<ThreadMsg>> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::atomic<bool> m_timerExit;
    const char* THREAD_NAME;
};

The first thing to notice is that std::thread is used to create a main worker thread. The main worker thread function is Process().

C++
bool WorkerThread::CreateThread()
{
    if (!m_thread)
        m_thread = new thread(&WorkerThread::Process, this);
    return true;
}

Event Loop

The Process() event loop is shown below. The thread relies upon a std::queue<ThreadMsg*> for the message queue. std::queue is not thread-safe so all access to the queue must be protected by mutex. A std::condition_variable is used to suspend the thread until notified that a new message has been added to the queue.

C++
void WorkerThread::Process()
{
    m_timerExit = false;
    std::thread timerThread(&WorkerThread::TimerThread, this);

    while (1)
    {
        std::shared_ptr<ThreadMsg> msg;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->id)
        {
            case MSG_POST_USER_DATA:
            {
                ASSERT_TRUE(msg->msg != NULL);

                auto userData = std::static_pointer_cast<UserData>(msg->msg);
                cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;

                break;
            }

            case MSG_TIMER:
                cout << "Timer expired on " << THREAD_NAME << endl;
                break;

            case MSG_EXIT_THREAD:
            {
                m_timerExit = true;
                timerThread.join();
                return;
            }

            default:
                ASSERT();
        }
    }
}

PostMsg() creates a new ThreadMsg on the heap, adds the message to the queue, and then notifies the worker thread using a condition variable.

C++
void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
{
    ASSERT_TRUE(m_thread);

    // Create a new ThreadMsg
    std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));

    // Add user data msg to queue and notify worker thread
    std::unique_lock<std::mutex> lk(m_mutex);
    m_queue.push(threadMsg);
    m_cv.notify_one();
}

The loop will continue to process messages until the MSG_EXIT_THREAD is received and the thread exits.

C++
void WorkerThread::ExitThread()
{
    if (!m_thread)
        return;

    // Create a new ThreadMsg
    std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));

    // Put exit thread message into the queue
    {
        lock_guard<mutex> lock(m_mutex);
        m_queue.push(threadMsg);
        m_cv.notify_one();
    }

    m_thread->join();
    m_thread = nullptr;
}

Event Loop (Win32)

The code snippet below contrasts the std::thread event loop above with a similar Win32 version using the Windows API. Notice GetMessage() API is used in lieu of the std::queue. Messages are posted to the OS message queue using PostThreadMessage(). And finally, timerSetEvent() is used to place WM_USER_TIMER messages into the queue. All of these services are provided by the OS. The std::thread WorkerThread implementation presented here avoids the raw OS calls yet the implementation functionality is the same as the Win32 version while relying only upon only the C++ Standard Library.

C++
unsigned long WorkerThread::Process(void* parameter)
{
    MSG msg;
    BOOL bRet;

    // Start periodic timer
    MMRESULT timerId = timeSetEvent(250, 10, &WorkerThread::TimerExpired, 
                       reinterpret_cast<DWORD>(this), TIME_PERIODIC);

    while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
    {
        switch (msg.message)
        {
            case WM_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg.wParam != NULL);

                // Convert the ThreadMsg void* data back to a UserData*
                const UserData* userData = static_cast<const UserData*>(msg.wParam);

                cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;

                // Delete dynamic data passed through message queue
                delete userData;
                break;
            }

            case WM_USER_TIMER:
                cout << "Timer expired on " << THREAD_NAME << endl;
                break;

            case WM_EXIT_THREAD:
                timeKillEvent(timerId);
                return 0;

            default:
                ASSERT();
        }
    }
    return 0;
}

Timer

A low-resolution periodic timer message is inserted into the queue using a secondary private thread. The timer thread is created inside Process().

C++
void WorkerThread::Process()
{
    m_timerExit = false;
    std::thread timerThread(&WorkerThread::TimerThread, this);

...

The timer thread’s sole responsibility is to insert a MSG_TIMER message every 250ms. In this implementation, there’s no protection against the timer thread injecting more than one timer message into the queue. This could happen if the worker thread falls behind and can’t service the message queue fast enough. Depending on the worker thread, processing load, and how fast the timer messages are inserted, additional logic could be employed to prevent flooding the queue.

C++
void WorkerThread::TimerThread()
{
    while (!m_timerExit)
    {
        // Sleep for 250mS then put a MSG_TIMER into the message queue
        std::this_thread::sleep_for(250ms);

        std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));

        // Add timer msg to queue and notify worker thread
        std::unique_lock<std::mutex> lk(m_mutex);
        m_queue.push(threadMsg);
        m_cv.notify_one();
    }
}

Usage

The main() function below shows how to use the WorkerThread class. Two worker threads are created and a message is posted to each one. After a short delay, both threads exit.

C++
// Worker thread instances
WorkerThread workerThread1("WorkerThread1");
WorkerThread workerThread2("WorkerThread2");

int main(void)
{    
    // Create worker threads
    workerThread1.CreateThread();
    workerThread2.CreateThread();

    // Create message to send to worker thread 1
    std::shared_ptr<UserData> userData1(new UserData());
    userData1->msg = "Hello world";
    userData1->year = 2017;

    // Post the message to worker thread 1
    workerThread1.PostMsg(userData1);

    // Create message to send to worker thread 2
    std::shared_ptr<UserData> userData2(new UserData());
    userData2->msg = "Goodbye world";
    userData2->year = 2017;

    // Post the message to worker thread 2
    workerThread2.PostMsg(userData2);

    // Give time for messages processing on worker threads
    this_thread::sleep_for(1s);

    workerThread1.ExitThread();
    workerThread2.ExitThread();

    return 0;
}

Conclusion

The C++ thread support library offers a platform independent way to write multi-threaded application code without reliance upon OS-specific API’s. The WorkerThread class presented here is a bare-bones implementation of an event loop, yet all the basics are there ready to be expanded upon.

History

  • 5th February, 2017
    • Initial release
  • 7th February, 2017
    • Updated article to provide clarifications on an event loop and contrast the implementation with a Win32 event loop
  • 13th September, 2020
    • Minor moderization updates to simplify the implementation. New source code and article updates. 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)