Introduction
Callbacks are a powerful concept used to reduce the coupling between two pieces of code. On a multithreaded system, callbacks have limitations. What I've always wanted was a callback mechanism that crosses threads and handles all the low-level machinery to get my event data from one thread to another safely. I need a small, portable and easy to use framework. No more monster switch
statements inside a thread loop that typecast OS message queue void*
values based upon an enumeration. Create a callback. Register a callback. And the framework automagically invokes the callback with data arguments on a user specified target thread is the goal.
The callback solution presented here provides the following features:
- Asynchronous callbacks – support asynchronous callbacks to and from any thread
- Thread targeting – specify the destination thread for the asynchronous callback
- Callbacks – invoke any C or C++ free function with a matching signature
- Type safe – user defined, type safe callback function data arguments
- Member functions – call instance member functions
- Multicast callbacks – store multiple callbacks within a list for sequential invocation
- Thread-safe – suitable for use on a multi-threaded system
- Compact – small, easy to maintain code base consuming minimal code space
- Portable – portable to an embedded or PC-based platform
- Any compiler – no advanced C++ language features
- Any OS - easy porting to any operating system
- Elegant syntax – intuitive and easy to use
The callback paradigm significantly eases multithreaded application development by placing the callback and callback data onto the thread of control that you specify. Exposing an asynchronous callback interface for a single class, module or an entire subsystem is extremely easy. The framework is no more difficult to use than a standard C callback but with more features.
This article proposes an inter-thread communication mechanism utilizing asynchronous multicast callbacks. The attached source code implements all features above, as I'll demonstrate.
Three asynchronous multicast callback implementations are available: Two in C++ and one written in C. See the References section for the two other related articles.
CMake is used to create the build files. CMake is free and open-source software. Windows, Linux and other toolchains are supported. See the CMakeLists.txt file for more information.
See GitHub for latest source code:
Callbacks Background
The idea of a function callback is very useful. In callback terms, a publisher defines the callback signature and allows anonymous registration of a callback function pointer. A subscriber creates a function implementation conforming to the publisher's callback signature and registers a callback function pointer with the publisher at runtime. The publisher code knows nothing about the subscriber code – the registration and the callback invocation is anonymous.
Now, on a multithreaded system, you need understand synchronous vs. asynchronous callback invocations. If the callback is synchronous, the callback is executed on the caller's thread of control. If you put a break point inside the callback, the stack frame will show the publisher function call and the publisher callback all synchronously invoked. There are no multithreaded issues with this scenario as everything is running on a single thread.
If the publisher code has its own thread, it may invoke the callback function on its thread of control and not the subscriber's thread. A publisher invoked callback can occur at any time completely independent of the subscriber’s thread of control. This cross-threading can cause problems for the subscriber if the callback code is not thread-safe since you now have another thread calling into subscriber code base at some unknown interval.
One solution for making a callback function thread-safe is to post a message to the subscriber's OS queue during the publisher's callback. The subscriber's thread later dequeues the message and calls an appropriate function. Since the callback implementation only posts a message, the callback, even if done asynchronously, is thread-safe. In this case, the asynchrony of a message queue provides the thread safety in lieu of software locks.
Callbacks are typically free functions, either a class static
member or a global function. In C++, instance member functions are handled differently and have significant limitations when it comes to member function pointers. I won't go into all the sorted details, the topic has been covered endlessly elsewhere, but suffice to say you can't have a single pointer point to all function types. This framework supports calling free functions, but offers support to get the call back onto an instance member function.
Using the Code
I'll first present how to use the code, and then get into the implementation details.
A publisher uses the AsycCallback<>
class to expose a callback interface to potential subscribers. An instance is created with one template argument – the user data type for function callback argument. In the example below, an int
will become the callback function argument.
AsyncCallback<int> callback;
To subscribe to callback, create a free function (static
member or global) as shown. I’ll explain why the <int>
argument requires a (const int&, void*)
function signature shortly.
void SimpleCallback(const int& value, void* userData)
{
cout << "SimpleCallback " << value << endl;
}
The subscriber registers to receive callbacks using the Register()
function. The first argument is a pointer to the callback function. The second argument is a pointer to a thread the callback is to be invoked on.
callback.Register(&SimpleCallback, &workerThread1);
When the publisher needs to invoke the callback for all registered subscribers, use operator()
or Invoke()
. Neither function executes the callback synchronously; instead it dispatches each callback onto the destination thread of control.
callback(123);
callback.Invoke(123);
Use Unregister()
to unsubscribe a callback.
callback.Unregister(&SimpleCallback, &workerThread1);
Alternatively, to unregister all callbacks use Clear()
.
callback.Clear();
Always check if anyone is subscribed to the callback before invocation using one of these two methods:
if (callback)
callback(123);
if (!callback.Empty())
callback(123);
An AsyncCallback<>
is easily used to add asynchrony to both incoming and outgoing API interfaces. The following examples show how.
SysData Example
SysData
is a simple class showing how to expose an outgoing asynchronous interface. The class stores system data and provides asynchronous subscriber notifications when the mode changes. The class interface is shown below:
class SysData
{
public:
AsyncCallback<SystemModeChanged> SystemModeChangedCallback;
static SysData& GetInstance();
void SetSystemMode(SystemMode::Type systemMode);
private:
SysData();
~SysData();
SystemMode::Type m_systemMode;
LOCK m_lock;
};
The subscriber interface for receiving callbacks is SystemModeChangedCallback
. Calling SetSystemMode()
saves the new mode into m_systemMode
and notifies all registered subscribers.
void SysData::SetSystemMode(SystemMode::Type systemMode)
{
LockGuard lockGuard(&m_lock);
SystemModeChanged callbackData;
callbackData.PreviousSystemMode = m_systemMode;
callbackData.CurrentSystemMode = systemMode;
m_systemMode = systemMode;
if (SystemModeChangedCallback)
SystemModeChangedCallback(callbackData);
}
SysDataClient Example
SysDataClient
is a callback subscriber and registers for notifications within the constructor. Notice the third argument to Register()
is a this
pointer. The pointer is passed back as userData
on each callback. The framework internally does nothing with userData
other that pass it back to the callback invocation. The userData
value can be anything the caller wants.
SysDataClient() :
m_numberOfCallbacks(0)
{
SysData::GetInstance().SystemModeChangedCallback.Register(&SysDataClient::CallbackFunction,
&workerThread1, this);
}
SysDataClient::CallbackFunction()
is now called when the system mode changes. Note that the userData
argument is typecast back to a SysDataClient
instance. Since Register()
provided a this
pointer, the callback function is able to access any object instance or function during execution.
static void CallbackFunction(const SystemModeChanged& data, void* userData)
{
SysDataClient* instance = static_cast<SysDataClient*>(userData);
instance->m_numberOfCallbacks++;
cout << "CallbackFunction " << data.CurrentSystemMode << endl;
}
When SetSystemMode()
is called, anyone interested in the mode changes are notified asynchronously on their desired execution thread.
SysData::GetInstance().SetSystemMode(SystemMode::STARTING);
SysData::GetInstance().SetSystemMode(SystemMode::NORMAL);
SysDataNoLock Example
SysDataNoLocks
is an alternate implementation that uses a private
AsyncCallback<>
for setting the system mode asynchronously and without locks.
class SysDataNoLock
{
public:
AsyncCallback<SystemModeChanged> SystemModeChangedCallback;
static SysDataNoLock& GetInstance();
void SetSystemMode(SystemMode::Type systemMode);
private:
SysDataNoLock();
~SysDataNoLock();
AsyncCallback<SystemMode::Type> SetSystemModeCallback;
static void SetSystemModePrivate(const SystemMode::Type& systemMode, void* userData);
SystemMode::Type m_systemMode;
};
The constructor registers SetSystemModePrivate()
with the private
SetSystemModeCallback
.
SysDataNoLock::SysDataNoLock() :
m_systemMode(SystemMode::STARTING)
{
SetSystemModeCallback.Register(&SysDataNoLock::SetSystemModePrivate, &workerThread2, this);
workerThread2.CreateThread();
}
The SetSystemMode()
function below is an example of an asynchronous incoming interface. To the caller, it looks like a normal function, but under the hood, a private
member call is invoked asynchronously. In this case, invoking SetSystemModeCallback
causes SetSystemModePrivate()
to be called on workerThread2
.
void SysDataNoLock::SetSystemMode(SystemMode::Type systemMode)
{
SetSystemModeCallback(systemMode);
}
Since this private
function is always invoked asynchronously on workerThread2
it doesn't require locks.
void SysDataNoLock::SetSystemModePrivate(const SystemMode::Type& systemMode, void* userData)
{
SysDataNoLock* instance = static_cast<SysDataNoLock*>(userData);
SystemModeChanged callbackData;
callbackData.PreviousSystemMode = instance->m_systemMode;
callbackData.CurrentSystemMode = systemMode;
instance->m_systemMode = systemMode;
if (instance->SystemModeChangedCallback)
instance->SystemModeChangedCallback(callbackData);
}
Timer Example
Once a callback framework is in place, creating a timer callback service is trivial. Many systems need a way to generate a callback based on a timeout. Maybe it's a periodic timeout for some low speed polling, or maybe an error timeout in case something doesn't occur within the expected time frame. Either way, the callback must occur on a specified thread of control. An AsyncCallback<>
used inside a Timer
class solves this nicely.
class Timer
{
public:
AsyncCallback<TimerData> Expired;
void Start(UINT32 timeout);
void Stop();
};
Users create an instance of the timer and register for the expiration. In this case, MyCallback()
is called in 1000ms.
m_timer.Expired.Register(&MyClass::MyCallback, &myThread, this);
m_timer.Start(1000);
A Timer
implementation isn't offered in the examples. However, the article "C++ State Machine with Threads" contains a Timer
class that shows a complete multithreaded example of AsyncCallback<>
integrated with a C++ state machine.
Callback Signature Limitations
This design has the following limitations imposed on all callback functions:
- Each callback handles a single user defined argument type (
TData
). - The two callback function arguments are always:
const TData&
and void*
. - Each callback has a
void
return type.
For instance, if an AsyncCallback<>
is declared as:
AsyncCallback<MyData> myCallback;
The callback function signature is:
void MyCallback(const MyData& data, void* userData);
The design can be extended to support more than one argument if necessary. However, the design somewhat mimics what embedded programmers do all the time, which is something like:
- Dynamically create an instance to a
struct
or class
and populate data. - Post a pointer to the data through an OS message as a
void*
. - Get the data from the OS message queue and typecast the
void*
back to the original type. - Delete the dynamically created data.
In this design, the entire infrastructure happens automatically without any additional effort on the programmer's part. If multiple data parameters are required, they must be packaged into a single class
/struct
and used as the callback data argument.
Implementation
The number of lines of code for the callback framework is surprisingly low. Strip out the comments, and maybe a couple hundred lines of code that are (hopefully) easy to understand and maintain.
AsyncCallback<>
and AsyncCallbackBase
form the basis for publishing a callback interface. The classes are thread-safe. The base version is non-templatized to reduce code space. AsyncCallbackBase
provides the invocation list and thread safety mechanisms.
AsyncCallback::Invoke()
iterates over the list and dispatches callback messages to each target thread. The data is dynamically created to travel through an OS message queue.
void Invoke(const TData& data)
{
LockGuard lockGuard(GetLock());
InvocationNode* node = GetInvocationHead();
while (node != NULL)
{
const Callback* callback = new Callback(*node->CallbackElement);
const TData* callbackData = new TData(data);
CallbackMsg* msg = new CallbackMsg(this, callback, callbackData);
callback->GetCallbackThread()->DispatchCallback(msg);
node = node->Next;
}
}
AsyncCallback::TargetInvoke()
is called by target thread to actually execute the callback. Dynamic data is deleted after the callback is invoked.
virtual void TargetInvoke(CallbackMsg** msg) const
{
const Callback* callback = (*msg)->GetCallback();
const TData* callbackData = static_cast<const TData*>((*msg)->GetCallbackData());
CallbackFunc func = reinterpret_cast<CallbackFunc>(callback->GetCallbackFunction());
(*func)(*callbackData, callback->GetUserData());
delete callbackData;
delete callback;
delete *msg;
*msg = NULL;
}
Asynchronous callbacks impose certain limitations because everything the callback destination thread needs must be created on the heap, packaged into a class, and placed into an OS message queue.
The insertion into an OS queue is platform specific. The CallbackThread
class provides the interface to be implemented on each target platform. See the Porting section below for a more complete discussion.
class CallbackThread
{
public:
virtual void DispatchCallback(CallbackMsg* msg) = 0;
};
Once the message is placed into the message queue, platform specific code unpacks the message and calls the AsyncCallbackBase::TargetInvoke()
function and destroys dynamically allocated data.
unsigned long WorkerThread::Process(void* parameter)
{
MSG msg;
BOOL bRet;
while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
{
switch (msg.message)
{
case WM_DISPATCH_CALLBACK:
{
ASSERT_TRUE(msg.wParam != NULL);
ThreadMsg* threadMsg = reinterpret_cast<ThreadMsg*>(msg.wParam);
CallbackMsg* callbackMsg = static_cast<CallbackMsg*>(threadMsg->GetData());
callbackMsg->GetAsyncCallback()->TargetInvoke(&callbackMsg);
delete threadMsg;
break;
}
case WM_EXIT_THREAD:
return 0;
default:
ASSERT();
}
}
return 0;
}
Notice the thread loop is unlike most systems that have a huge switch
statement handling various incoming data messages, type casting void*
data, then calling a specific function. The framework supports all callbacks with a single WM_DISPATCH_CALLBACK
message. Once setup, the same small thread loop handles every callback. New publisher and subscribers come and go as the system is designed, but the code in-between doesn't change.
This is a huge benefit as on many systems getting data between threads takes a lot of manual steps. You constantly have to mess with each thread loop, create during sending, destroy data when receiving, and call various OS services and typecasts. Here, you do none of that. All the stuff in-between is neatly handled for users.
Heap
The heap is used to create dynamic data. It stems from using an invocation list and needing to send data objects through the message queue. Remember, your callback data is copied and destroyed during a callback. Most times, the callback data is POD (Plain Old Data Structure). If you have something fancier that can't be bitwise copied, be sure to implement a copy constructor for the callback data.
On some systems, it is undesirable to use the heap. For those situations, I use a fixed block memory allocator. The xallocator
implementation solves the dynamic storage issues and is much faster than the global heap. To use, just include xallocator.h and add the macro XALLOCATOR
to the class declaration. An entire class hierarchy can use the fixed block allocator by placing XALLOCTOR
in the base class.
#include "xallocator.h"
class Callback
{
XALLOCATOR
};
With xallocator
in place, calling operator new
or delete
allows the fixed block allocator to take over the storage duties. How objects are created and destroyed is exactly the same, only the source of the memory is different. For more information on xallocator
, and to get the source code, see the article "Replace malloc/free with a Fast Fixed Block Memory Allocator". The only files needed are Allocator.h/cpp and xallocator.h/cpp.
To use xallocator
in the callback framework, place XALLOCATOR
macros in the following class definitions:
Callback
CallbackMsg
InvocationNode
For the platform specific files, you also include XALLOCATOR
. In this example, these are:
ThreadMsg
SystemModeChanged
Porting
The code is an easy port to any platform. There are only two OS services required: threads and a software lock. The code is separated into five directories.
AsyncCallback
- core framework implementation files PortWin
– Windows-specific files (thread/lock) Examples
– sample code showing usage VS2008
– Visual Studio 2008 project files VS2015
– Visual Studio 2015 project files
The library has a single abstract
class CallbackThread
with a single pure virtual
function:
virtual void DispatchCallback(CallbackMsg* msg) = 0;
On most projects, I wrap the underlying raw OS calls into a thread class to encapsulate and enforce the correct behavior. Here, I provide ThreadWin
as a wrapper over the CreateThread()
Windows API.
Once you have a thread class, just inherit the CallbackThread
interface and implement the DispatchCallback()
function. On Windows, a simple post to a message queue is all that is required:
void ThreadWin::DispatchCallback(CallbackMsg* msg)
{
ThreadMsg* threadMsg = new ThreadMsg(WM_DISPATCH_CALLBACK, msg);
PostThreadMessage(WM_DISPATCH_CALLBACK, threadMsg);
}
The Windows thread loop gets the message and calls the TargetInvoke()
function for the incoming instance. The data sent through the queue is deleted once complete.
switch (msg.message)
{
case WM_DISPATCH_CALLBACK:
{
ASSERT_TRUE(msg.wParam != NULL);
ThreadMsg* threadMsg = reinterpret_cast<ThreadMsg*>(msg.wParam);
CallbackMsg* callbackMsg = static_cast<CallbackMsg*>(threadMsg->GetData());
callbackMsg->GetAsyncCallback()->TargetInvoke(&callbackMsg);
delete threadMsg;
break;
}
case WM_EXIT_THREAD:
return 0;
default:
ASSERT();
}
Software locks are handled by the LockGuard
class. This class can be updated with locks of your choice, or you can use a different mechanism. Locks are only used in a few places.
Code Size
To gauge the cost of using this technique, the code was built for an ARM CPU using Keil. If deployed on a project, many AsyncCallback<>
instances will be created so it needs to be space efficient.
The incremental code size of for each additional AsyncCallback<>
, one subscriber, one registration call, and one callback invocation is around 120 bytes using full optimization. You’d certainly use at least this much code moving data from one thread to another manually.
Which Callback Implementation?
I’ve documented three different asynchronous multicast callback implementations here on CodeProject. Each version has its own unique features and advantages. The sections below highlight the main differences between each solution. See the References section below for links to each article.
Asynchronous Multicast Callbacks in C
- Implemented in C
- Callback function is a free or static member only
- One callback argument supported
- Callback argument must be a pointer type
- Callback argument data copied with
memcpy
- Type-safety provided by macros
- Static array holds registered subscriber callbacks
- Number of registered subscribers fixed at compile time
- Fixed block memory allocator in C
- Compact implementation
Asynchronous Multicast Callbacks with Inter-Thread Messaging
- Implemented in C++
- Callback function is a free or static member only
- One callback argument supported
- Callback argument must be a pointer type
- Callback argument data copied with copy constructor
- Type-safety provided by templates
- Minimal use of templates
- Dynamic list of registered subscriber callbacks
- Number of registered subscribers expands at runtime
- Fixed block memory allocator in C++
- Compact implementation
Asynchronous Multicast Delegates in C++
- Implemented in C++
- C++ delegate paradigm
- Any callback function type (member, static, free)
- Multiple callback arguments supported (up to 5)
- Callback argument any type (value, reference, pointer, pointer to pointer)
- Callback argument data copied with copy constructor
- Type-safety provided by templates
- Heavy use of templates
- Dynamic list of registered subscriber callbacks
- Number of registered subscribers expands at runtime
- Fixed block memory allocator in C++
- Larger implementation
References
Conclusion
There are many ways to design a publisher/subscriber callback system. This version incorporates unique features I've never seen before, especially the ease at which asynchronous callbacks are generated onto a client specified thread of control. The implementation was kept to a minimum to facilitate porting to any system embedded or otherwise.
I've used this technique on projects with great success. Each class or subsystem may expose one or more outgoing interfaces with AsyncCallback<>
instances. Any code within the system is able to connect and receive asynchronous callbacks with worrying about cross-threading or the machinery to make it all work. A feature like this eases application design and architecturally standardizes inter-thread communication with a well-understood callback paradigm.
History
- 15th April, 2016
- 17th April, 2016
- Updated article to correct mistakes
- 18th April, 2016
- Updated source code to include VS2008 and VS2015 project files to speed evaluation testing
- Minor article corrections
- 22nd April, 2016
- New References section
- Minor code simplifications with updated source code
- Update article text
- 29th April, 2016
- Eliminated
ThreadWin
heap usage - Updated attached source code
- 19th November, 2016
- Minor bug fix with source code
- Updated Timer and References sections
- 2nd December, 2016
- Fix bug in
Callback::operator==()
- Updated attached source code
- 25th January, 2019
- Added "Which Callback Implementation?" section
- Updated References section