Introduction
Callbacks are a powerful concept used to reduce the coupling between two pieces of code. On a multithreaded system, callbacks have limitations. What I've always wanted was a callback mechanism that crosses threads and handles all the low-level machinery to get my function call and event data from one thread to another safely. A portable and easy to use framework. No more monster switch
statements inside a thread loop that typecast OS message queue void*
values based upon an enumeration. Create a callback. Register a callback. And the framework automagically invokes the callback with data arguments on a user specified target thread is the goal.
On systems that use event loops (aka message loops), a message queue and switch
statement are sometimes employed to handle incoming messages. The loop waits for messages and dispatches a function call with data into the program. This solution aims to simplify and standardize the event loop in order to generalize function dispatching and the movement of data between threads.
The callback solution presented here provides the following features:
- Asynchronous callbacks – support asynchronous callbacks to and from any thread
- Thread targeting – specify the destination thread for the asynchronous callback
- Callbacks – invoke any C or C++ free/static function with a matching signature
- Type safe – user defined, type safe callback function data argument
- Multicast callbacks – store multiple callbacks within an array for sequential invocation
- Thread-safe – suitable for use on a multi-threaded system
- Compact – small, easy to maintain code base consuming minimal code space
- Portable – portable to an embedded or PC-based platform
- Any compiler – support for any C language compiler
- Any OS - easy porting to any operating system
- Elegant syntax – intuitive and easy to use
The asynchronous callback paradigm significantly eases multithreaded application development by placing the callback function pointer and callback data onto the thread of control that you specify. Exposing a callback interface for a single module or an entire subsystem is extremely easy. The framework is no more difficult to use than a standard C callback but with more features.
This article proposes an inter-thread communication mechanism utilizing asynchronous multicast callbacks. The attached source code implements all features above, as I'll demonstrate.
CMake is used to create the build files. CMake is free and open-source software. Windows, Linux and other toolchains are supported. See the CMakeLists.txt file for more information.
See GitHub for latest source code:
Callbacks Background
The idea of a function callback is very useful. In callback terms, a publisher defines the callback signature and allows anonymous registration of a callback function pointer. A subscriber creates a function implementation conforming to the publisher's callback signature and registers a callback function pointer with the publisher at runtime. The publisher code knows nothing about the subscriber code – the registration and the callback invocation is anonymous.
Now, on a multithreaded system, you need understand synchronous vs. asynchronous callback invocations. If the callback is synchronous, the callback is executed on the caller's thread of control. If you put a break point inside the callback, the stack frame will show the publisher function call and the publisher callback all synchronously invoked. There are no multithreaded issues with this scenario as everything is running on a single thread.
If the publisher code has its own thread, it may invoke the callback function on its thread of control and not the subscriber's thread. A publisher invoked callback can occur at any time completely independent of the subscriber’s thread of control. This cross-threading can cause problems for the subscriber if the callback code is not thread-safe since you now have another thread calling into subscriber code base at some unknown interval.
One solution for making a callback function thread-safe is to post a message to the subscriber's OS queue during the publisher's callback. The subscriber's thread later dequeues the message and calls an appropriate function. Since the callback implementation only posts a message, the callback, even if done asynchronously, is thread-safe. In this case, the asynchrony of a message queue provides the thread safety in lieu of software locks.
Using the Code
I'll first present how to use the code, and then get into the implementation details.
A publisher uses the CB_DECLARE
macro to expose a callback interface to potential subscribers, typically within a header file. The first argument is the callback name. The second argument is the callback function argument type. In the example below, int*
is the callback function argument.
CB_DECLARE(TestCb, int*)
The publisher uses the CB_DEFINE
macro within a source file to complete the callback definition. The first argument is the callback name. The second argument is the callback function argument type. The third argument is the size of the data pointed to by the callback function argument. The last argument is the maximum number of subscribers that can register for callback notifications.
CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)
To subscribe to a callback, create a function (static
class member or global) as shown. I’ll explain why the function signature argument requires a (int*, void*)
function signature shortly.
void TestCallback1(int* val, void* userData)
{
printf("TestCallback1 %d", *val);
}
The subscriber registers to receive callbacks using the CB_Register()
function macro. The first argument is the callback name. The second argument is a pointer to the callback function. The third argument is a pointer to a thread dispatch function or NULL
if a synchronous callback is desired. And the last argument is a pointer to optional user data passed during callback invocation. The framework internally does nothing with user data other than pass it back to the callback function. The user data value can be anything the caller wants or NULL
.
CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);
On C/C++ mixed projects, the userData
callback argument can be used to store a this
class instance pointer. Pass a class static
member function pointer for the callback function and a this
pointer for user data to CB_Register()
. Within the subscriber callback function, typecast userData
back to a class instance pointer. This provides an easy means of accessing class instance functions and data within a static
callback function.
Use CB_Invoke()
when a publisher needs to invoke the callback for all registered subscribers. The function dispatches the callback and data argument onto the destination thread of control. In the example below, TestCallback1()
is called on DispatchCallbackThread1
.
int data = 123;
CB_Invoke(TestCb, &data);
Use CB_Unregister()
to unsubscribe from a callback.
CB_Unregister(TestCb, TestCallback1, DispatchCallbackThread1);
Asynchronous callbacks are easily used to add asynchrony to both incoming and outgoing API interfaces. The following examples show how.
SysData Publisher Example
SysData
is a simple module showing how to expose an outgoing asynchronous interface. The module stores system data and provides asynchronous subscriber notifications when the mode changes. The module interface is shown below.
typedef enum
{
STARTING,
NORMAL,
SERVICE,
SYS_INOP
} SystemModeType;
typedef struct
{
SystemModeType PreviousSystemMode;
SystemModeType CurrentSystemMode;
} SystemModeData;
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)
void SD_Init(void);
void SD_Term(void);
void SD_SetSystemMode(SystemModeType systemMode);
The publisher callback interface is SystemModeChangedCb
. Calling SD_SetSystemMode()
saves the new mode into _systemMode
and notifies all registered subscribers.
void SD_SetSystemMode(SystemModeType systemMode)
{
LK_LOCK(_hLock);
SystemModeData callbackData;
callbackData.PreviousSystemMode = _systemMode;
callbackData.CurrentSystemMode = systemMode;
_systemMode = systemMode;
CB_Invoke(SystemModeChangedCb, &callbackData);
LK_UNLOCK(_hLock);
}
SysData Subscriber Example
The subscriber creates a callback function that conforms to the publisher's callback function signature.
void SysDataCallback(const SystemModeData* data, void* userData)
{
cout << "SysDataCallback: " << data->CurrentSystemMode << endl;
}
At runtime, CB_Register()
is used to register for SysData
callbacks on DispatchCallbackThread1
.
CB_Register(SystemModeChangedCb, SysDataCallback, DispatchCallbackThread1, NULL);
When SD_SetSystemMode()
is called, any client interested in the mode changes are notified asynchronously on their desired execution thread.
SD_SetSystemMode(STARTING);
SD_SetSystemMode(NORMAL);
SysDataNoLock Publisher Example
SysDataNoLock
is an alternate implementation that uses a private callback for setting the system mode asynchronously and without locks.
CB_DECLARE(SystemModeChangedNoLockCb, const SystemModeData*)
void SDNL_Init(void);
void SDNL_Term(void);
void SDNL_SetSystemMode(SystemModeType systemMode);
The initialize function registers with the private SetSystemModeCb
callback.
CB_DECLARE(SetSystemModeCb, SystemModeType*)
CB_DEFINE(SetSystemModeCb, SystemModeType*, sizeof(SystemModeType), 1)
void SDNL_Init(void)
{
CB_Register(SetSystemModeCb, SDNL_SetSystemModePrivate, DispatchCallbackThread1, NULL);
}
The SSNL_SetSystemMode()
function below is an example of an asynchronous incoming interface. To the caller, it looks like a normal function, but, under the hood, a private function call is invoked asynchronously. In this case, invoking SetSystemModeCb
causes SDNL_SetSystemModePrivate()
to be called on DispatchCallbackThread1
.
void SDNL_SetSystemMode(SystemModeType systemMode)
{
CB_Invoke(SetSystemModeCb, &systemMode);
}
Since this private function is always invoked asynchronously on DispatchCallbackThread1
it doesn't require locks.
static void SDNL_SetSystemModePrivate(SystemModeType* systemMode, void* userData)
{
SystemModeData callbackData;
callbackData.PreviousSystemMode = _systemMode;
callbackData.CurrentSystemMode = *systemMode;
_systemMode = *systemMode;
CB_Invoke(SystemModeChangedNoLockCb, &callbackData);
}
Callback Signature Limitations
This design has the following limitations imposed on all callback functions:
- Each callback handles a single user-defined argument type.
- The argument may be a
const
or non-const
pointer (e.g. const MyData*
or MyData*
). - The two callback function arguments are always:
MyData*
and void*
. - Each callback has a
void
return type.
For instance, if a callback is declared as:
CB_DECLARE(TestCb, const MyData*)
The callback function signature is:
void MyCallback(const MyData* data, void* userData);
The design can be extended to support more than one argument if necessary. However, the design somewhat mimics what embedded programmers do all the time, which is something like:
- Dynamically create an instance to a struct or class and populate data
- Post a pointer to the data through an OS message as a
void*
- Get the data from the OS message queue and typecast the
void*
back to the original type - Delete the dynamically created data
In this design, the entire infrastructure happens automatically without any additional effort on the programmer's part. If multiple data parameters are required, they must be packaged into a single class/struct and used as the callback data argument.
Implementation
The number of lines of code for the callback framework is surprisingly low. Strip out the comments, and maybe a couple hundred lines of code that are (hopefully) easy to understand and maintain.
The implementation uses macros and token pasting to provide a unique type-safe interface for each callback. The token pasting operator (##
) is used to merge two tokens when the preprocessor expands the macro. The CB_DECLARE
macro is shown below:
#define CB_DECLARE(cbName, cbArg) \
typedef void(*cbName##CallbackFuncType)(cbArg cbData, void* cbUserData); \
BOOL cbName##_Register(cbName##CallbackFuncType cbFunc,
CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData); \
BOOL cbName##_IsRegistered(cbName##CallbackFuncType cbFunc,
CB_DispatchCallbackFuncType cbDispatchFunc); \
BOOL cbName##_Unregister(cbName##CallbackFuncType cbFunc,
CB_DispatchCallbackFuncType cbDispatchFunc); \
BOOL cbName##_Invoke(cbArg cbData); \
BOOL cbName##_InvokeArray(cbArg cbData, size_t num, size_t size);
In the SysData
example used above, the compiler preprocessor expands CB_DECLARE
to:
typedef void(*SystemModeChangedCbCallbackFuncType)
(const SystemModeData* cbData, void* cbUserData);
BOOL SystemModeChangedCb_Register(SystemModeChangedCbCallbackFuncType cbFunc,
CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData);
BOOL SystemModeChangedCb_IsRegistered(SystemModeChangedCbCallbackFuncType cbFunc,
CB_DispatchCallbackFuncType cbDispatchFunc);
BOOL SystemModeChangedCb_Unregister(SystemModeChangedCbCallbackFuncType cbFunc,
CB_DispatchCallbackFuncType cbDispatchFunc);
BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData);
BOOL SystemModeChangedCb_InvokeArray(const SystemModeData* cbData, size_t num, size_t size);
Notice every cbName##
location is replaced by the macro name argument, in this case, being SystemModeChangedCb
from the declaration below.
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)
Similarly, the CB_DEFINE
macro expands to create the callback function implementations. Notice the macro provides a thin, type-safe wrapper around private functions such as _CB_AddCallback()
and _CB_Dispatch()
. If attempting to register the wrong function signature, the compiler generates an error or warning. The macros automate the monotonous, boilerplate code that you’d normally write by hand.
The registered callbacks are stored in a static
array of CB_Info
instances. Calling CB_Invoke(SystemModeChangedCb, &callbackData)
executes SystemModeChangedCb_Invoke()
. Then _CB_Dispatch()
iterates over the CB_Info
array and dispatches one CB_CallbackMsg
message to each target thread. The message data is dynamically created to travel through an OS message queue.
BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData)
{
return _CB_Dispatch(&SystemModeChangedCbMulticast[0], 2, cbData, sizeof(SystemModeData));
}
BOOL _CB_Dispatch(CB_Info* cbInfo, size_t cbInfoLen, const void* cbData,
size_t cbDataSize)
{
BOOL invoked = FALSE;
LK_LOCK(_hLock);
for (size_t idx = 0; idx<cbInfoLen; idx++)
{
if (cbInfo[idx].cbFunc)
{
if (CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize))
{
invoked = TRUE;
}
}
}
LK_UNLOCK(_hLock);
return invoked;
}
The target OS task event loop dequeues a CB_CallbackMsg*
and calls CB_TargetInvoke()
. The dynamic data created is freed before the function exits.
void CB_TargetInvoke(const CB_CallbackMsg* cbMsg)
{
ASSERT_TRUE(cbMsg);
ASSERT_TRUE(cbMsg->cbFunc);
cbMsg->cbFunc(cbMsg->cbData, cbMsg->cbUserData);
XFREE((void*)cbMsg->cbData);
XFREE((void*)cbMsg);
}
Asynchronous callbacks impose certain limitations because everything the callback destination thread needs must be created on the heap, packaged into a CB_CallbackMsg
structure, and placed into an OS message queue.
The insertion into an OS queue is platform specific. The CB_DispatchCallbackFuncType
function pointer typedef
provides the OS queue interface to be implemented for each thread event loop on the target platform. See the Porting section below for a more complete discussion.
typedef BOOL (*CB_DispatchCallbackFuncType)(const CB_CallbackMsg* cbMsg);
Once the message is placed into the message queue, platform specific code unpacks the message, calls the CB_TargetInvoke()
function and destroys dynamically allocated data. For this example, a simple WorkerThreadStd
class provides the thread event loop leveraging the C++ thread support library. While this example uses C++ threads, the callback modules are written in plain C. Abstracting the OS details from the callback implementation makes this possible.
void WorkerThread::Process()
{
while (1)
{
ThreadMsg* msg = 0;
{
std::unique_lock<std::mutex> lk(m_mutex);
while (m_queue.empty())
m_cv.wait(lk);
if (m_queue.empty())
continue;
msg = m_queue.front();
m_queue.pop();
}
switch (msg->GetId())
{
case MSG_DISPATCH_DELEGATE:
{
ASSERT_TRUE(msg->GetData() != NULL);
const CB_CallbackMsg* callbackMsg =
static_cast<const CB_CallbackMsg*>(msg->GetData());
CB_TargetInvoke(callbackMsg);
delete msg;
break;
}
}
}
}
Notice the thread loop is unlike most systems that have a huge switch statement handling various incoming data messages, type casting void*
data, then calling a specific function. The framework supports all callbacks with a single WM_DISPATCH_DELEGATE
message. Once setup, the same small thread loop handles every callback. New publishers and subscribers come and go as the system is designed, but the code in-between doesn't change.
This is a huge benefit as on many systems getting data between threads takes a lot of manual steps. You constantly have to mess with each thread loop, create during sending, destroy data when receiving, and call various OS services and typecasts. Here you do none of that. All the stuff in-between is neatly handled for users.
The two lists below show the call sequence required to asynchronously invoke the callback function TestCallback1()
on DispatchCallbackThread1
. For this example, TestCb
is created using CB_DECLARE
/CB_DEFINE
and the TestCallback1()
callback function is registered with TestCb
using CB_Register()
.
CB_DECLARE(TestCb, int*)
CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)
CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);
Main Thread
CB_Invoke(TestCb, &data)
– macro function initiates all registered callbacks from the main thread TestCb_Invoke(&data)
– a type-safe macro wrapper function created by CB_DECLARE
_CB_Dispatch(&TestCbMulticast[0], MAX_REGISTER, &data, sizeof(int))
– private
function loops through all registered subscribers CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize)
– the cbData
callback data is dispatched to a registered subscriber DispatchCallbackThread1(cbMsg)
– the dynamically allocated cbMsg
is placed into the thread 1 message queue
Dispatch Callback Thread 1
WorkerThread::Process()
– the worker thread gets cbMsg
from the message queue CB_TargetInvoke(cbMsg)
– the callback is invoked on the target thread of control TestCallback1(&data, NULL)
– the target callback function is called with callback data
on thread 1
Heap
The dynamic data is required to send data structures through the message queue. Remember, the data pointed to by your callback argument is bitwise copied during a callback.
On some systems, it is undesirable to use the heap. For those situations, I use a fixed block memory allocator. The x_allocator
implementation solves the dynamic storage issues and is much faster than the global heap. To use, just define USE_CALLBACK_ALLOCATOR
within callback.c. See the References section for more information on x_allocator
.
Porting
The code is an easy port to any platform. There are only two OS services required: threads and a software lock. The code is separated into four directories.
- Callback - core library implementation files
- Port – Windows-specific files (thread/lock)
- Examples – sample code showing usage
- VS2017 – Visual Studio 2017 project files
Porting to another platform requires implementing a dispatch function that accepts a const CB_CallbackMsg*
for each thread. The functions below show an example.
extern "C" BOOL DispatchCallbackThread1(const CB_CallbackMsg* cbMsg)
{
workerThread1.DispatchCallback(cbMsg);
return TRUE;
}
void WorkerThread::DispatchCallback(const CB_CallbackMsg* msg)
{
ASSERT_TRUE(m_thread);
ThreadMsg* threadMsg = new ThreadMsg(MSG_DISPATCH_DELEGATE, msg);
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
The thread event loop gets the message and calls the CB_TargetInvoke()
function. The data sent through the queue is deleted once complete.
case MSG_DISPATCH_DELEGATE:
{
ASSERT_TRUE(msg->GetData() != NULL);
const CB_CallbackMsg* callbackMsg = static_cast<const CB_CallbackMsg*>(msg->GetData());
CB_TargetInvoke(callbackMsg);
delete msg;
break;
}
Software locks are handled by the LockGuard
module. This file can be updated with locks of your choice, or you can use a different mechanism. Locks are only used in a few places. Define USE_LOCKS
within callback.c to use LockGuard
module locks.
Which Callback Implementation?
I’ve documented three different asynchronous multicast callback implementations here on CodeProject. Each version has its own unique features and advantages. The sections below highlight the main differences between each solution. See the References section below for links to each article.
Asynchronous Multicast Callbacks in C
- Implemented in C
- Callback function is a free or static member only
- One callback argument supported
- Callback argument must be a pointer type
- Callback argument data copied with
memcpy
- Type-safety provided by macros
- Static array holds registered subscriber callbacks
- Number of registered subscribers fixed at compile time
- Fixed block memory allocator in C
- Compact implementation
Asynchronous Multicast Callbacks with Inter-Thread Messaging
- Implemented in C++
- Callback function is a free or static member only
- One callback argument supported
- Callback argument must be a pointer type
- Callback argument data copied with copy constructor
- Type-safety provided by templates
- Minimal use of templates
- Dynamic list of registered subscriber callbacks
- Number of registered subscribers expands at runtime
- Fixed block memory allocator in C++
- Compact implementation
Asynchronous Multicast Delegates in C++
- Implemented in C++
- C++ delegate paradigm
- Any callback function type (member, static, free)
- Multiple callback arguments supported (up to 5)
- Callback argument any type (value, reference, pointer, pointer to pointer)
- Callback argument data copied with copy constructor
- Type-safety provided by templates
- Heavy use of templates
- Dynamic list of registered subscriber callbacks
- Number of registered subscribers expands at runtime
- Fixed block memory allocator in C++
- Larger implementation
References
Conclusion
There are many ways to design a publisher/subscriber callback system. This C language version incorporates unique features, standardizes the event loop, and eases generating asynchronous callbacks onto a client specified thread of control. The implementation was kept to a minimum to facilitate porting to any system embedded or otherwise.
This callback implementation works on C and C++ projects. However, if developing a strictly C++ project, you could consider using one of the C++ callback implementations listed within the References section.
I've used this technique on projects with great success. Each module or subsystem may expose one or more outgoing interfaces with CB_DECLARE
and CB_DEFINE
macros. Any code within the system is able to connect and receive asynchronous callbacks with worrying about cross-threading or the machinery to make it all work. A feature like this eases application design and architecturally standardizes inter-thread communication with a well-understood callback paradigm.
History
- 6th January, 2019
- 29th January, 2019