Introduction
Event-driven programming is a computer programming paradigm in which the flow of the program is determined by user actions (mouse clicks, key presses), other hardware events, or messages from other programs[1]. The alternative is batch programming, where the programmer writes a fixed control sequence determining the actions of the program. In the Microsoft Windows environment, event-driven programs contain an event loop that is registered with the operating system. The operating system passes messages describing events to the program; events might include GUI actions such as mouse and keyboard clicks, file system changes, or timers. For each different event, the program executes an event handler or trigger function to process it. Event-driven programming stresses flexibility and asynchrony as virtues, and tries to be as modeless as possible. Event-driven programming is a popular model for writing programs for graphical user interfaces, embedded systems, and sensor network nodes.
Background
On the Windows platform, you create a message loop for an independent thread by using the GetMessage
and DispatchMessage
functions. The GetMessage
function retrieves a message from the calling thread's message queue and places it in a specified structure. This function can retrieve messages associated with a specified Window and thread messages posted via the PostThreadMessage
function. GetMessage
suspends the thread and does not return until a new message is placed in the thread's message queue. PeekMessage
is a non-blocking alternative, merely checking for the presence of a message in the queue, returning the message immediately. The DispatchMessage
function dispatches a message to a Window procedure.
Windows is rather unique in this respect. Linux and UNIX do not provide these types of API functions.
In this article, I introduce a way of creating a cross-platform, thread-independent message loop using basic operation system objects such as mutexes, events/conditions, threads, processes and a message queue data structure.
At the end of the article, I provide a sample that two independent player state machines run at separate threads based on the same state machine profile.
A Way to Creating Event Synchronization Mechanism
Every event-driven program has a loop somewhere that catches received events and processes them. The events may be generated by the operating system environment, or by other processes or threads. In the code that follows, I propose a set of APIs to construct a message loop with three requirements; the message loop should work for thread-independent programs, a thread may post a message to a queue which may be read by another thread, and the APIs must be cross-platform, working with both Win32 and Linux.
In general, in my approach, I allocate an independent message pool, an event/condition and a Mutex for an event loop thread. When a new external event posts, set the event/condition signalled. The Mutex for a thread pool is to synchronize the message pool access between the event sending thread and the receiving thread.
You may ask, "Why do I not utilize counting semaphore instead of event/condition, when an event posted increases the counter, and when an event consumed decreases the counter?" There are 2 reasons for this question. Semaphore is used to synchronize processes or threads and event/condition are used to synchronize threads in a process only. The performance of event/condition is better than semaphore. And there is a constraint in semaphore. You have to specify the maximum count when you create a semaphore using the CreateSemaphore()
Windows function. On the Linux platform, the maximum count for the semaphore is SEMVMX(32767)
. However, there should be no limitation in the number of events in the queue.
In my approach, the prototypes for event synchronization mechanism are defined as below:
#if defined LINUX
typedef pthread_t XTHREADID;
typedef pthread_t XTHREADHANDLE;
typedef pthread_mutex_t XMUTEX;
typedef pthread_cond_t XEVENT;
#elif defined WIN32
typedef DWORD XTHREADID;
typedef HANDLE XTHREADHANDLE;
typedef HANDLE XMUTEX;
typedef DWORD XEVENT;
#endif
typedef int (*XTHREAD_SAFE_ACTION_T)(void*p);
typedef BOOL (*XIS_CONDITION_OK_T)(void*p);
int XCreateEvent(XEVENT *pEvent);
int XDestroyEvent(XEVENT *pEvent)
int XWaitForEvent(XEVENT *pEvent, XMUTEX *pMutex, unsigned long MiliSec,
XIS_CONDITION_OK_T pIsConditionOK, void *pCondParam,
XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
int XSignalEvent(XEVENT *pEvent, XMUTEX *pMutex,
XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
XCreateEvent()
The XCreateEvent()
function creates a virtual event which a thread may post or wait for. On the Windows platform, this function creates a thread-independent message queue and returns a thread ID that the XSignalEvent()
function uses for posting messages. XSignalEvent()
places messages in the queue for the identified thread. On the Linux platform, this function creates a Linux thread condition variable. When events are available in the queue it will be true, otherwise the condition is false.
XDestroyEvent()
XDestroyEvent()
function frees the allocated resources for the event queue.
XWaitForEvent()
XWaitForEvent()
function waits for an event and takes an action when the event occurs. The action is implemented as a thread-safe call back function which is protected by a Mutex. The condition will be true if events are available in the queue. The action could be a function to get an event data from the queue.
On Windows, this function calls GetMessage()
to wait for a message posted to the thread queue through the Windows PostThreadMessage
function.
On Linux, this function calls the pthread_cond_wait()
function to automatically unlock the input mutex (as per pthread_unlock_mutex
) and waits for the condition variable condition to be signalled. The thread execution is suspended and does not consume any CPU time until the condition variable is signalled. The input mutex is locked by the calling thread on entrance to pthread_cond_wait
.
XSignalEvent()
XSignalEvent()
function posts an event to the message queue and then calls the function specified in the input argument. The function might post event data to an event queue. On Windows, this function calls PostThreadMessage
function to post a Windows message to the thread message queue.
On Linux, this function modifies the shared resources, which are protected by the mutex, meets the condition and signal the condition.
int XCreateEvent(XEVENT *pEvent)
{
if (pEvent==NULL)
return -1;
#ifdef WIN32
*pEvent = GetCurrentThreadId();
return 0;
#else
return pthread_cond_init(pEvent, NULL);
#endif
}
int XDestroyEvent(XEVENT *pEvent)
{
if (pEvent==NULL)
return -1;
#ifdef WIN32
return 0;
#else
return pthread_cond_destroy(pEvent);
#endif
}
int XWaitForEvent(XEVENT *pEvent, XMUTEX *pMutex, unsigned long MilliSec,
XIS_CONDITION_OK_T pIsConditionOK, void *pCondParam,
XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
{
if (pEvent==NULL || pMutex==NULL || pIsConditionOK==NULL)
return -1;
#ifdef WIN32
DWORD nRet=WAIT_OBJECT_0;
MSG WinMsg;
while (GetMessage(&WinMsg, NULL, 0, 0))
{
if (WinMsg.message == WM_EXT_EVENT_ID)
{
if (pAction)
{
XMutexLock(pMutex);
(*pAction)(pActionParam);
XMutexUnlock(pMutex);
}
return 0;
}
else {
DispatchMessage(&WinMsg);
};
}
return 0;
#else
pthread_mutex_lock(pMutex);
struct timespec timeout;
time_t now;
int rc=0;
if (!(*pIsConditionOK)(pCondParam))
{
if (time(&now) < 0)
return -1;
timeout.tv_sec = now + (MilliSec / 1000);
timeout.tv_nsec = 0;
rc = pthread_cond_timedwait(pEvent, pMutex, &timeout);
}
if (rc != ETIMEDOUT)
{
if (pAction)
(*pAction)(pActionParam);
}
pthread_mutex_unlock(pMutex);
if (rc == ETIMEDOUT)
return XWAIT_TIMEOUT; else
return rc;
#endif
}
int XSignalEvent(XEVENT *pEvent, XMUTEX *pMutex,
XTHREAD_SAFE_ACTION_T pAction, void *pActionParam)
{
int ret=0;
if (pEvent==NULL || pMutex==NULL)
return -1;
#ifdef WIN32
if (pAction)
{
XMutexLock(pMutex);
(*pAction)(pActionParam);
XMutexUnlock(
pMutex);
}
if (0==*pEvent) return -1;
PostThreadMessage(*pEvent, WM_EXT_EVENT_ID, 0, 0);
return 0;
#else
if (pMutex==NULL)
return -1;
pthread_mutex_lock(pMutex);
if (pAction)
(*pAction)(pActionParam);
ret = pthread_cond_signal(pEvent);
pthread_mutex_unlock(pMutex);
return ret;
#endif
}
Thread-Independent Event Queue Creation and Event Overflow Prevention
The following code creates a thread-independent event queue based on the cross-platform APIs. Each thread has an independent message pool, an event/condition and a Mutex.
When a new external event post, the event/condition will be signalled. The Mutex for a thread pool is to synchronize the message pool access between the event sending thread and the receiving thread.
There is an issue that we should handle. If the speed of the event handling is later than the event triggering, the event buffer will be overflow. For example, timer events, if two many timer events are triggered and timer handler cannot consume them on time, timer event will overwhelm other more important events. We could solve this problem through removing the duplicate timer events which trigger from the same timer, before these events are put into the event queue.
typedef struct tagEXTMSG
{
int nMsgID; unsigned char nDataFormat ;
unsigned char nCategory ;
union SME_EVENT_DATA_T Data;
SME_APP_T *pDestApp;
unsigned long nSequenceNum;
SME_THREAD_CONTEXT_T* pDestThread;
} X_EXT_MSG_T;
#define MSG_BUF_SIZE 100
typedef struct tagEXTMSGPOOL
{
int nMsgBufHdr;
int nMsgBufRear;
X_EXT_MSG_T MsgBuf[MSG_BUF_SIZE];
XEVENT EventToThread;
XMUTEX MutextForPool;
} X_EXT_MSG_POOL_T;
BOOL XInitMsgBuf()
{
SME_THREAD_CONTEXT_T* pThreadContext = XGetThreadContext();
X_EXT_MSG_POOL_T *pMsgPool;
pThreadContext->pExtEventPool = (void*)malloc(sizeof(X_EXT_MSG_POOL_T));
pMsgPool =(X_EXT_MSG_POOL_T*)(pThreadContext->pExtEventPool);
if (NULL==pMsgPool)
return FALSE;
memset(pMsgPool, 0, sizeof(X_EXT_MSG_POOL_T));
XCreateMutext(&(pMsgPool->MutextForPool));
XCreateEvent(&(pMsgPool->EventToThread));
return TRUE;
}
BOOL XFreeMsgBuf()
{
SME_THREAD_CONTEXT_T* pThreadContext = XGetThreadContext();
if (NULL!=pThreadContext && NULL!=pThreadContext->pExtEventPool)
{
free(pThreadContext->pExtEventPool);
pThreadContext->pExtEventPool= NULL;
return TRUE;
}
return FALSE;
}
static BOOL XIsMsgAvailable(void *pArg)
{
SME_THREAD_CONTEXT_T* p = XGetThreadContext();
X_EXT_MSG_POOL_T *pMsgPool;
if (NULL==p || NULL== p->pExtEventPool)
return FALSE;
SME_UNUSED_VOIDP_PARAM(pArg);
pMsgPool = (X_EXT_MSG_POOL_T*)p->pExtEventPool;
if (pMsgPool->nMsgBufHdr==pMsgPool->nMsgBufRear)
return FALSE; return TRUE;
}
static void XAppendMsgToBuf(void *pArg)
{
X_EXT_MSG_T *pMsg = (X_EXT_MSG_T*)pArg;
int nHdr;
X_EXT_MSG_POOL_T *pMsgPool;
if (NULL==pMsg || NULL==pMsg->pDestThread ||
NULL==pMsg->pDestThread->pExtEventPool)
return;
pMsgPool = (X_EXT_MSG_POOL_T*)(pMsg->pDestThread->pExtEventPool);
if (((pMsgPool->nMsgBufRear+1) % MSG_BUF_SIZE) == pMsgPool->nMsgBufHdr)
return; nHdr = pMsgPool->nMsgBufHdr;
if (SME_EVENT_TIMER == pMsg->nMsgID)
{
while (nHdr != pMsgPool->nMsgBufRear)
{
if (SME_EVENT_TIMER == pMsgPool->MsgBuf[nHdr].nMsgID
&& pMsg->nSequenceNum == pMsgPool->MsgBuf[nHdr].nSequenceNum)
return;
nHdr = (nHdr+1) % MSG_BUF_SIZE;
}
}
memcpy(&(pMsgPool->MsgBuf[pMsgPool->nMsgBufRear]),
pMsg,sizeof(X_EXT_MSG_T));
pMsgPool->nMsgBufRear = (pMsgPool->nMsgBufRear+1)%MSG_BUF_SIZE;
}
static void XGetMsgFromBuf(void *pArg)
{
X_EXT_MSG_T *pMsg = (X_EXT_MSG_T*)pArg;
SME_THREAD_CONTEXT_T* p = XGetThreadContext();
X_EXT_MSG_POOL_T *pMsgPool;
if (NULL==pMsg || NULL==p || NULL==p->pExtEventPool)
return;
pMsgPool = (X_EXT_MSG_POOL_T*)(p->pExtEventPool);
if (pMsgPool->nMsgBufHdr==pMsgPool->nMsgBufRear)
return; memcpy(pMsg,&(pMsgPool->MsgBuf[pMsgPool->nMsgBufHdr]),
sizeof(X_EXT_MSG_T));
pMsgPool->MsgBuf[pMsgPool->nMsgBufHdr].nMsgID =0;
pMsgPool->nMsgBufHdr = (pMsgPool->nMsgBufHdr+1)%MSG_BUF_SIZE;
}
int XPostThreadExtIntEvent(SME_THREAD_CONTEXT_T* pDestThreadContext,
int nMsgID, int Param1, int Param2,
SME_APP_T *pDestApp, unsigned long nSequenceNum,unsigned char nCategory)
{
X_EXT_MSG_T Msg;
X_EXT_MSG_POOL_T *pMsgPool;
if (nMsgID==0 || NULL== pDestThreadContext ||
NULL==pDestThreadContext->pExtEventPool)
return -1;
Msg.nMsgID = nMsgID;
Msg.pDestApp = pDestApp;
Msg.pDestThread = pDestThreadContext;
Msg.nSequenceNum = nSequenceNum;
Msg.nDataFormat = SME_EVENT_DATA_FORMAT_INT;
Msg.nCategory = nCategory;
Msg.Data.Int.nParam1 = Param1;
Msg.Data.Int.nParam2 = Param2;
pMsgPool = (X_EXT_MSG_POOL_T *)(pDestThreadContext->pExtEventPool);
XSignalEvent(&(pMsgPool->EventToThread),&(pMsgPool->MutextForPool),
(XTHREAD_SAFE_ACTION_T)XAppendMsgToBuf,&Msg);
return 0;
}
int XPostThreadExtPtrEvent(SME_THREAD_CONTEXT_T* pDestThreadContext,
int nMsgID, void *pData, int nDataSize,
SME_APP_T *pDestApp, unsigned long nSequenceNum,unsigned char nCategory)
{
X_EXT_MSG_T Msg;
X_EXT_MSG_POOL_T *pMsgPool;
if (nMsgID==0 || pDestThreadContext==NULL ||
NULL==pDestThreadContext->pExtEventPool)
return -1;
Msg.nMsgID = nMsgID;
Msg.pDestApp = pDestApp;
Msg.pDestThread = pDestThreadContext;
Msg.nSequenceNum = nSequenceNum;
Msg.nCategory = nCategory;
Msg.nDataFormat = SME_EVENT_DATA_FORMAT_PTR;
if (pData!=NULL && nDataSize>0)
{
#if SME_CPP
Msg.Data.Ptr.pData = new char[nDataSize];
#else
Msg.Data.Ptr.pData = malloc(nDataSize);
#endif
memcpy(Msg.Data.Ptr.pData, pData, nDataSize);
Msg.Data.Ptr.nSize = nDataSize;
}
else
{
Msg.Data.Ptr.pData = NULL;
Msg.Data.Ptr.nSize = 0;
}
pMsgPool = (X_EXT_MSG_POOL_T *)(pDestThreadContext->pExtEventPool);
XSignalEvent(&(pMsgPool->EventToThread),&(pMsgPool->MutextForPool),
(XTHREAD_SAFE_ACTION_T)XAppendMsgToBuf,&Msg);
return 0;
}
BOOL XGetExtEvent(SME_EVENT_T* pEvent)
{
X_EXT_MSG_T NativeMsg;
int i=0;
int ret=0;
SME_THREAD_CONTEXT_T* p = XGetThreadContext();
X_EXT_MSG_POOL_T *pMsgPool;
if (NULL==pEvent || NULL==p || NULL==p->pExtEventPool)
return FALSE;
pMsgPool = (X_EXT_MSG_POOL_T*)(p->pExtEventPool);
memset(&NativeMsg,0,sizeof(NativeMsg));
while (TRUE)
{
while (i<10)
{
ret = XWaitForEvent(&(pMsgPool->EventToThread),
&(pMsgPool->MutextForPool), 10000,
(XIS_CODITION_OK_T)XIsMsgAvailable, NULL,
(XTHREAD_SAFE_ACTION_T)XGetMsgFromBuf,&NativeMsg);
if (ret != XWAIT_TIMEOUT)
break;
i++;
}
if (ret == XWAIT_TIMEOUT)
return FALSE;
else
{
if (NativeMsg.nMsgID == SME_EVENT_EXIT_LOOP)
{
return FALSE; }
#ifdef WIN32
#else
else if (SME_EVENT_TIMER == NativeMsg.nMsgID &&
SME_TIMER_CALLBACK == NativeMsg.Data.Int.nParam1)
{
SME_TIMER_PROC_T pfnCallback = (SME_TIMER_PROC_T)
(NativeMsg.Data.Int.nParam2);
(*pfnCallback)(NativeMsg.pDestApp, NativeMsg.nSequenceNum);
}
#endif
else
{
memset(pEvent,0,sizeof(SME_EVENT_T));
pEvent->nEventID = NativeMsg.nMsgID;
pEvent->pDestApp = NativeMsg.pDestApp;
pEvent->nSequenceNum = NativeMsg.nSequenceNum;
pEvent->nDataFormat = NativeMsg.nDataFormat;
pEvent->nCategory = NativeMsg.nCategory;
pEvent->bIsConsumed = FALSE;
memcpy(&(pEvent->Data),&(NativeMsg.Data),
sizeof(union SME_EVENT_DATA_T));
}
return TRUE;
}
}; }
BOOL XDelExtEvent(SME_EVENT_T *pEvent)
{
if (0==pEvent)
return FALSE;
if (pEvent->nDataFormat == SME_EVENT_DATA_FORMAT_PTR)
{
if (pEvent->Data.Ptr.pData)
{
#if SME_CPP
delete pEvent->Data.Ptr.pData;
#else
free(pEvent->Data.Ptr.pData);
#endif
pEvent->Data.Ptr.pData=NULL;
}
}
return TRUE;
}
A Sample: Two Thread-Independent State Machines
Based on this approach, we provide a sample that two independent player state machines run at separate threads based on the same state machine profile.
There are 3 running threads in this sample: the state machine application thread-1 for Player-1 application, thread-2 for Player-2 application and the external event trigger thread. Player-1 and Player-2 applications are created based on the Player state machine profile, they run independently. Player-1 or Player-2 has an independent control panel.
Points of Interest
A set of cross-platform OS-related functions are present in the source code including:
- Process Management
- Thread Management
- Mutex
- Clock
- Built-in Timer
- Thread Local Storage
More Information
You may download more information at http://www.intelliwizard.com/ the official site of the UML StateWizard open source project under LGPL license.
[1] http://en.wikipedia.org/wiki/Event_driven