Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Create Cross-platform Thread-Independent Event Loops

4.89/5 (5 votes)
12 May 20076 min read 1   690  
This article discloses what is behind the GetMessage() and PostThreadMessage() Windows API, and implements them on Linux and Windows platforms using basic operation system functions.

Introduction

Event-driven programming is a computer programming paradigm in which the flow of the program is determined by user actions (mouse clicks, key presses), other hardware events, or messages from other programs[1]. The alternative is batch programming, where the programmer writes a fixed control sequence determining the actions of the program. In the Microsoft Windows environment, event-driven programs contain an event loop that is registered with the operating system. The operating system passes messages describing events to the program; events might include GUI actions such as mouse and keyboard clicks, file system changes, or timers. For each different event, the program executes an event handler or trigger function to process it. Event-driven programming stresses flexibility and asynchrony as virtues, and tries to be as modeless as possible. Event-driven programming is a popular model for writing programs for graphical user interfaces, embedded systems, and sensor network nodes.

Background

On the Windows platform, you create a message loop for an independent thread by using the GetMessage and DispatchMessage functions. The GetMessage function retrieves a message from the calling thread's message queue and places it in a specified structure. This function can retrieve messages associated with a specified Window and thread messages posted via the PostThreadMessage function. GetMessage suspends the thread and does not return until a new message is placed in the thread's message queue. PeekMessage is a non-blocking alternative, merely checking for the presence of a message in the queue, returning the message immediately. The DispatchMessage function dispatches a message to a Window procedure.

Windows is rather unique in this respect. Linux and UNIX do not provide these types of API functions.

In this article, I introduce a way of creating a cross-platform, thread-independent message loop using basic operation system objects such as mutexes, events/conditions, threads, processes and a message queue data structure.

At the end of the article, I provide a sample that two independent player state machines run at separate threads based on the same state machine profile.

A Way to Creating Event Synchronization Mechanism

Every event-driven program has a loop somewhere that catches received events and processes them. The events may be generated by the operating system environment, or by other processes or threads. In the code that follows, I propose a set of APIs to construct a message loop with three requirements; the message loop should work for thread-independent programs, a thread may post a message to a queue which may be read by another thread, and the APIs must be cross-platform, working with both Win32 and Linux.

In general, in my approach, I allocate an independent message pool, an event/condition and a Mutex for an event loop thread. When a new external event posts, set the event/condition signalled. The Mutex for a thread pool is to synchronize the message pool access between the event sending thread and the receiving thread.

You may ask, "Why do I not utilize counting semaphore instead of event/condition, when an event posted increases the counter, and when an event consumed decreases the counter?" There are 2 reasons for this question. Semaphore is used to synchronize processes or threads and event/condition are used to synchronize threads in a process only. The performance of event/condition is better than semaphore. And there is a constraint in semaphore. You have to specify the maximum count when you create a semaphore using the CreateSemaphore() Windows function. On the Linux platform, the maximum count for the semaphore is SEMVMX(32767). However, there should be no limitation in the number of events in the queue.

In my approach, the prototypes for event synchronization mechanism are defined as below:

C++
#if defined LINUX
 typedef pthread_t         XTHREADID;
 typedef pthread_t         XTHREADHANDLE;
 typedef pthread_mutex_t   XMUTEX;
 typedef pthread_cond_t    XEVENT;
#elif defined WIN32
 typedef DWORD             XTHREADID;
 typedef HANDLE            XTHREADHANDLE;
 typedef HANDLE            XMUTEX;
 typedef DWORD             XEVENT;
#endif
typedef int (*XTHREAD_SAFE_ACTION_T)(void*p);
typedef BOOL (*XIS_CONDITION_OK_T)(void*p);
int XCreateEvent(XEVENT *pEvent);
int XDestroyEvent(XEVENT *pEvent)
int XWaitForEvent(XEVENT *pEvent, XMUTEX *pMutex, unsigned long MiliSec, 
        XIS_CONDITION_OK_T pIsConditionOK,  void *pCondParam,
        XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
int XSignalEvent(XEVENT *pEvent, XMUTEX *pMutex, 
        XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)

XCreateEvent()

The XCreateEvent() function creates a virtual event which a thread may post or wait for. On the Windows platform, this function creates a thread-independent message queue and returns a thread ID that the XSignalEvent() function uses for posting messages. XSignalEvent() places messages in the queue for the identified thread. On the Linux platform, this function creates a Linux thread condition variable. When events are available in the queue it will be true, otherwise the condition is false.

XDestroyEvent()

XDestroyEvent() function frees the allocated resources for the event queue.

XWaitForEvent()

XWaitForEvent() function waits for an event and takes an action when the event occurs. The action is implemented as a thread-safe call back function which is protected by a Mutex. The condition will be true if events are available in the queue. The action could be a function to get an event data from the queue.

On Windows, this function calls GetMessage() to wait for a message posted to the thread queue through the Windows PostThreadMessage function.

On Linux, this function calls the pthread_cond_wait() function to automatically unlock the input mutex (as per pthread_unlock_mutex) and waits for the condition variable condition to be signalled. The thread execution is suspended and does not consume any CPU time until the condition variable is signalled. The input mutex is locked by the calling thread on entrance to pthread_cond_wait.

XSignalEvent()

XSignalEvent() function posts an event to the message queue and then calls the function specified in the input argument. The function might post event data to an event queue. On Windows, this function calls PostThreadMessage function to post a Windows message to the thread message queue.

On Linux, this function modifies the shared resources, which are protected by the mutex, meets the condition and signal the condition.

C++
int XCreateEvent(XEVENT *pEvent)
{
    if (pEvent==NULL)
    return -1;
    #ifdef WIN32
    *pEvent = GetCurrentThreadId();
    return 0;
    #else
    return pthread_cond_init(pEvent, NULL);
    #endif
}
int XDestroyEvent(XEVENT *pEvent)
{
    if (pEvent==NULL)
    return -1;
    #ifdef WIN32
    return 0;
    #else
    return pthread_cond_destroy(pEvent);
    #endif
}
// Wait for an event signalled and then take some thread-safe actions.
int XWaitForEvent(XEVENT *pEvent, XMUTEX *pMutex, unsigned long MilliSec, 
                XIS_CONDITION_OK_T pIsConditionOK, void *pCondParam,
        XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
{
    if (pEvent==NULL || pMutex==NULL || pIsConditionOK==NULL)
    return -1;
    #ifdef WIN32
    DWORD nRet=WAIT_OBJECT_0;

    MSG WinMsg;
    // GetMessage()
    // If the function retrieves a message other than WM_QUIT, 
    // the return value is nonzero.
    // If the function retrieves the WM_QUIT message, 
    // the return value is zero.
    while (GetMessage(&WinMsg, NULL, 0, 0))
    {
        if (WinMsg.message == WM_EXT_EVENT_ID)
        {
            // External event is triggered. App go to running state.
            if (pAction)
            {
                XMutexLock(pMutex);
                (*pAction)(pActionParam);
                XMutexUnlock(pMutex);
            }
            return 0;
        }
        // Handle Windows messages in MMI thread, for example audio messages.
        else //if (handle_win_msg_in_mmi_thread(&WinMsg) == FALSE)
        {
            DispatchMessage(&WinMsg);
        };
    }
    return 0;
    #else
    // pthread_cond_wait() atomically unlocks the mutex 
    // (as per pthread_unlock_mutex) and waits for the 
    // condition variable cond to be signalled. The thread
    // execution is suspended and does not consume any CPU time 
    // until the condition variable is signalled. 
    // The mutex must be locked by the calling thread on
    // entrance to pthread_cond_wait. Before returning to the calling thread, 
    // pthread_cond_wait re-acquires mutex (as per pthread_lock_mutex).
    pthread_mutex_lock(pMutex);
    struct timespec timeout;
    time_t now;
    int rc=0;
    if (!(*pIsConditionOK)(pCondParam))
    {
        if (time(&now) < 0)
        return -1;
        timeout.tv_sec = now + (MilliSec / 1000);
        timeout.tv_nsec = 0;
        rc = pthread_cond_timedwait(pEvent, pMutex, &timeout);
    }
    if (rc != ETIMEDOUT)
    {
        if (pAction)
        (*pAction)(pActionParam);
    }
    pthread_mutex_unlock(pMutex);
    if (rc == ETIMEDOUT)
        return XWAIT_TIMEOUT; // Actually XWAIT_TIMEOUT is ETIMEDOUT
    else
        return rc;
#endif
}
// Take some thread-safe actions before signal the event.
int XSignalEvent(XEVENT *pEvent, XMUTEX *pMutex, 
            XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
{
    int ret=0;
    if (pEvent==NULL || pMutex==NULL)
    return -1;
    #ifdef WIN32
    if (pAction)
    {
        XMutexLock(pMutex);
        (*pAction)(pActionParam); 
        // At this time, (*pIsConditionOK)() should return TRUE.
        XMutexUnlock(
        pMutex);
    }
    if (0==*pEvent) return -1;
        PostThreadMessage(*pEvent, WM_EXT_EVENT_ID, 0, 0);
        return 0;
    #else
    if (pMutex==NULL)
        return -1;
        pthread_mutex_lock(pMutex);
        // Modifications on the shared resources, 
        // which are protected by the mutex, 
        // meet the condition and should signal the if needed.
    if (pAction)
        (*pAction)(pActionParam);
        // pthread_cond_signal restarts one of the threads 
        // that are waiting on the condition variable condition.
        ret = pthread_cond_signal(pEvent);
        pthread_mutex_unlock(pMutex);
        return ret;
    #endif
}

Thread-Independent Event Queue Creation and Event Overflow Prevention

The following code creates a thread-independent event queue based on the cross-platform APIs. Each thread has an independent message pool, an event/condition and a Mutex.
When a new external event post, the event/condition will be signalled. The Mutex for a thread pool is to synchronize the message pool access between the event sending thread and the receiving thread.

There is an issue that we should handle. If the speed of the event handling is later than the event triggering, the event buffer will be overflow. For example, timer events, if two many timer events are triggered and timer handler cannot consume them on time, timer event will overwhelm other more important events. We could solve this problem through removing the duplicate timer events which trigger from the same timer, before these events are put into the event queue.

C++
typedef struct tagEXTMSG
{
    int nMsgID; // 0: stand for empty entity
    unsigned char nDataFormat ; 
    /* Flag for this event. 
    SME_EVENT_DATA_FORMAT_INT=0, SME_EVENT_DATA_FORMAT_PTR*/
    unsigned char nCategory ; /* Category of this event. */
    union SME_EVENT_DATA_T Data;
    SME_APP_T *pDestApp;
    unsigned long nSequenceNum;
    SME_THREAD_CONTEXT_T* pDestThread;
} X_EXT_MSG_T;
#define MSG_BUF_SIZE 100
typedef struct tagEXTMSGPOOL
{
    int nMsgBufHdr;
    int nMsgBufRear;
    X_EXT_MSG_T MsgBuf[MSG_BUF_SIZE];
    XEVENT EventToThread;
    XMUTEX MutextForPool;
} X_EXT_MSG_POOL_T;
/* Initialize the external event buffer at the current thread. */
BOOL XInitMsgBuf()
{
    SME_THREAD_CONTEXT_T* pThreadContext = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    pThreadContext->pExtEventPool = (void*)malloc(sizeof(X_EXT_MSG_POOL_T));
    pMsgPool =(X_EXT_MSG_POOL_T*)(pThreadContext->pExtEventPool);
    if (NULL==pMsgPool)
        return FALSE;
        memset(pMsgPool, 0, sizeof(X_EXT_MSG_POOL_T));
        XCreateMutext(&(pMsgPool->MutextForPool));
        XCreateEvent(&(pMsgPool->EventToThread));
    return TRUE;
}
/* Free the external event buffer at the current thread. */
BOOL XFreeMsgBuf()
{
    SME_THREAD_CONTEXT_T* pThreadContext = XGetThreadContext();
    if (NULL!=pThreadContext && NULL!=pThreadContext->pExtEventPool)
    {
        free(pThreadContext->pExtEventPool);
        pThreadContext->pExtEventPool= NULL;
        return TRUE;
    }
    return FALSE;
}
/* Is message available at the current thread event pool?*/
static BOOL XIsMsgAvailable(void *pArg)
{
    SME_THREAD_CONTEXT_T* p = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==p || NULL== p->pExtEventPool)
        return FALSE;
        SME_UNUSED_VOIDP_PARAM(pArg);
        pMsgPool = (X_EXT_MSG_POOL_T*)p->pExtEventPool;
        if (pMsgPool->nMsgBufHdr==pMsgPool->nMsgBufRear)
            return FALSE; // empty buffer.
        return TRUE;
}
/* Thread-safe action to append an external event to the rear 
of the queue at the destination thread.
Timer event overflow prevention.
*/
static void XAppendMsgToBuf(void *pArg)
{
    X_EXT_MSG_T *pMsg = (X_EXT_MSG_T*)pArg;
    int nHdr;
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==pMsg || NULL==pMsg->pDestThread || 
                NULL==pMsg->pDestThread->pExtEventPool)
    return;
    pMsgPool = (X_EXT_MSG_POOL_T*)(pMsg->pDestThread->pExtEventPool);

    if (((pMsgPool->nMsgBufRear+1) % MSG_BUF_SIZE) == pMsgPool->nMsgBufHdr)
        return; // buffer full.
        // Prevent duplicate SME_EVENT_TIMER event 
        // triggered by a timer in the queue.
        nHdr = pMsgPool->nMsgBufHdr;
        if (SME_EVENT_TIMER == pMsg->nMsgID)
        {
            while (nHdr != pMsgPool->nMsgBufRear)
            {
                if (SME_EVENT_TIMER == pMsgPool->MsgBuf[nHdr].nMsgID
                && pMsg->nSequenceNum == pMsgPool->MsgBuf[nHdr].nSequenceNum)
                return;
                nHdr = (nHdr+1) % MSG_BUF_SIZE;
            }
        }
    memcpy(&(pMsgPool->MsgBuf[pMsgPool->nMsgBufRear]),
                    pMsg,sizeof(X_EXT_MSG_T));
    pMsgPool->nMsgBufRear = (pMsgPool->nMsgBufRear+1)%MSG_BUF_SIZE;
}
/* Thread-safe action to remove an external event from the 
current thread event pool.*/
static void XGetMsgFromBuf(void *pArg)
{
    X_EXT_MSG_T *pMsg = (X_EXT_MSG_T*)pArg;
    SME_THREAD_CONTEXT_T* p = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==pMsg || NULL==p || NULL==p->pExtEventPool)
    return;
    pMsgPool = (X_EXT_MSG_POOL_T*)(p->pExtEventPool);
    if (pMsgPool->nMsgBufHdr==pMsgPool->nMsgBufRear)
    return; // empty buffer.
    memcpy(pMsg,&(pMsgPool->MsgBuf[pMsgPool->nMsgBufHdr]),
                        sizeof(X_EXT_MSG_T));
    pMsgPool->MsgBuf[pMsgPool->nMsgBufHdr].nMsgID =0;
    pMsgPool->nMsgBufHdr = (pMsgPool->nMsgBufHdr+1)%MSG_BUF_SIZE;
}
int XPostThreadExtIntEvent(SME_THREAD_CONTEXT_T* pDestThreadContext, 
                    int nMsgID, int Param1, int Param2,
SME_APP_T *pDestApp, unsigned long nSequenceNum,unsigned char nCategory)
{
    X_EXT_MSG_T Msg;
    X_EXT_MSG_POOL_T *pMsgPool;
    if (nMsgID==0 || NULL== pDestThreadContext || 
                NULL==pDestThreadContext->pExtEventPool)
    return -1;
    Msg.nMsgID = nMsgID;
    Msg.pDestApp = pDestApp;
    Msg.pDestThread = pDestThreadContext;
    Msg.nSequenceNum = nSequenceNum;
    Msg.nDataFormat = SME_EVENT_DATA_FORMAT_INT;
    Msg.nCategory = nCategory;
    Msg.Data.Int.nParam1 = Param1;
    Msg.Data.Int.nParam2 = Param2;
    pMsgPool = (X_EXT_MSG_POOL_T *)(pDestThreadContext->pExtEventPool);
    XSignalEvent(&(pMsgPool->EventToThread),&(pMsgPool->MutextForPool),
                (XTHREAD_SAFE_ACTION_T)XAppendMsgToBuf,&Msg);
    return 0;
}
int XPostThreadExtPtrEvent(SME_THREAD_CONTEXT_T* pDestThreadContext, 
                int nMsgID, void *pData, int nDataSize,
SME_APP_T *pDestApp, unsigned long nSequenceNum,unsigned char nCategory)
{
    X_EXT_MSG_T Msg;
    X_EXT_MSG_POOL_T *pMsgPool;
    if (nMsgID==0 || pDestThreadContext==NULL || 
                NULL==pDestThreadContext->pExtEventPool)
    return -1;
    Msg.nMsgID = nMsgID;
    Msg.pDestApp = pDestApp;
    Msg.pDestThread = pDestThreadContext;
    Msg.nSequenceNum = nSequenceNum;
    Msg.nCategory = nCategory;
    Msg.nDataFormat = SME_EVENT_DATA_FORMAT_PTR;
    if (pData!=NULL && nDataSize>0)
    {
        #if SME_CPP
        Msg.Data.Ptr.pData = new char[nDataSize];
        #else
        Msg.Data.Ptr.pData = malloc(nDataSize);
        #endif
        memcpy(Msg.Data.Ptr.pData, pData, nDataSize);
        Msg.Data.Ptr.nSize = nDataSize;
    }
    else
    {
        Msg.Data.Ptr.pData = NULL;
        Msg.Data.Ptr.nSize = 0;
    }
    pMsgPool = (X_EXT_MSG_POOL_T *)(pDestThreadContext->pExtEventPool);
    XSignalEvent(&(pMsgPool->EventToThread),&(pMsgPool->MutextForPool),
                (XTHREAD_SAFE_ACTION_T)XAppendMsgToBuf,&Msg);
    return 0;
}
BOOL XGetExtEvent(SME_EVENT_T* pEvent)
{
    X_EXT_MSG_T NativeMsg;
    int i=0;
    int ret=0;
    SME_THREAD_CONTEXT_T* p = XGetThreadContext();
    X_EXT_MSG_POOL_T *pMsgPool;
    if (NULL==pEvent || NULL==p || NULL==p->pExtEventPool)
    return FALSE;
    pMsgPool = (X_EXT_MSG_POOL_T*)(p->pExtEventPool);
    memset(&NativeMsg,0,sizeof(NativeMsg));
    while (TRUE)
    {
        while (i<10)
        {
            ret = XWaitForEvent(&(pMsgPool->EventToThread), 
            &(pMsgPool->MutextForPool), 10000, 
            (XIS_CODITION_OK_T)XIsMsgAvailable, NULL, 
            (XTHREAD_SAFE_ACTION_T)XGetMsgFromBuf,&NativeMsg);

            if (ret != XWAIT_TIMEOUT)
            break;
            i++;
        }
        if (ret == XWAIT_TIMEOUT)
            return FALSE;
        else
        {
            if (NativeMsg.nMsgID == SME_EVENT_EXIT_LOOP)
            {
                return FALSE; //Request Exit
            }
        #ifdef WIN32
        #else
        // Built-in call back timer on Linux
        else if (SME_EVENT_TIMER == NativeMsg.nMsgID && 
            SME_TIMER_CALLBACK == NativeMsg.Data.Int.nParam1)
        {
            // Invoke the call back function.
            SME_TIMER_PROC_T pfnCallback = (SME_TIMER_PROC_T)
                        (NativeMsg.Data.Int.nParam2);
            (*pfnCallback)(NativeMsg.pDestApp, NativeMsg.nSequenceNum);
        }
        #endif
        else
        {
            // Translate the native message to SME event.
            memset(pEvent,0,sizeof(SME_EVENT_T));
            pEvent->nEventID = NativeMsg.nMsgID;
            pEvent->pDestApp = NativeMsg.pDestApp;
            pEvent->nSequenceNum = NativeMsg.nSequenceNum;
            pEvent->nDataFormat = NativeMsg.nDataFormat;
            pEvent->nCategory = NativeMsg.nCategory;
            pEvent->bIsConsumed = FALSE;
            memcpy(&(pEvent->Data),&(NativeMsg.Data), 
                    sizeof(union SME_EVENT_DATA_T));
        }
        //printf("External message received. \n");
        return TRUE;
    }
}; // while (TRUE)
}
BOOL XDelExtEvent(SME_EVENT_T *pEvent)
{
    if (0==pEvent)
    return FALSE;
    if (pEvent->nDataFormat == SME_EVENT_DATA_FORMAT_PTR)
    {
        if (pEvent->Data.Ptr.pData)
        {
            #if SME_CPP
            delete pEvent->Data.Ptr.pData;
            #else
            free(pEvent->Data.Ptr.pData);
            #endif
            pEvent->Data.Ptr.pData=NULL;
        }
    }
    return TRUE;
}

A Sample: Two Thread-Independent State Machines

Based on this approach, we provide a sample that two independent player state machines run at separate threads based on the same state machine profile.

There are 3 running threads in this sample: the state machine application thread-1 for Player-1 application, thread-2 for Player-2 application and the external event trigger thread. Player-1 and Player-2 applications are created based on the Player state machine profile, they run independently. Player-1 or Player-2 has an independent control panel.

Points of Interest

A set of cross-platform OS-related functions are present in the source code including:

  • Process Management
  • Thread Management
  • Mutex
  • Clock
  • Built-in Timer
  • Thread Local Storage

More Information

You may download more information at http://www.intelliwizard.com/ the official site of the UML StateWizard open source project under LGPL license.

[1] http://en.wikipedia.org/wiki/Event_driven

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here