Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Asynchronous Multicast Callbacks with Inter-Thread Messaging

4.93/5 (45 votes)
25 Jan 2019CPOL14 min read 95.5K   1.9K  
Simplify passing data between threads using this multithreaded, portable C++ asynchronous callback framework

Introduction

Callbacks are a powerful concept used to reduce the coupling between two pieces of code. On a multithreaded system, callbacks have limitations. What I've always wanted was a callback mechanism that crosses threads and handles all the low-level machinery to get my event data from one thread to another safely. I need a small, portable and easy to use framework. No more monster switch statements inside a thread loop that typecast OS message queue void* values based upon an enumeration. Create a callback. Register a callback. And the framework automagically invokes the callback with data arguments on a user specified target thread is the goal.

The callback solution presented here provides the following features:

  1. Asynchronous callbacks – support asynchronous callbacks to and from any thread
  2. Thread targeting – specify the destination thread for the asynchronous callback
  3. Callbacks – invoke any C or C++ free function with a matching signature
  4. Type safe – user defined, type safe callback function data arguments
  5. Member functions – call instance member functions
  6. Multicast callbacks – store multiple callbacks within a list for sequential invocation
  7. Thread-safe – suitable for use on a multi-threaded system
  8. Compact – small, easy to maintain code base consuming minimal code space
  9. Portable – portable to an embedded or PC-based platform
  10. Any compiler – no advanced C++ language features
  11. Any OS - easy porting to any operating system
  12. Elegant syntax – intuitive and easy to use

The callback paradigm significantly eases multithreaded application development by placing the callback and callback data onto the thread of control that you specify. Exposing an asynchronous callback interface for a single class, module or an entire subsystem is extremely easy. The framework is no more difficult to use than a standard C callback but with more features.

This article proposes an inter-thread communication mechanism utilizing asynchronous multicast callbacks. The attached source code implements all features above, as I'll demonstrate.

Three asynchronous multicast callback implementations are available: Two in C++ and one written in C. See the References section for the two other related articles.

CMake is used to create the build files. CMake is free and open-source software. Windows, Linux and other toolchains are supported. See the CMakeLists.txt file for more information.

See GitHub for latest source code:

Callbacks Background

The idea of a function callback is very useful. In callback terms, a publisher defines the callback signature and allows anonymous registration of a callback function pointer. A subscriber creates a function implementation conforming to the publisher's callback signature and registers a callback function pointer with the publisher at runtime. The publisher code knows nothing about the subscriber code – the registration and the callback invocation is anonymous.

Now, on a multithreaded system, you need understand synchronous vs. asynchronous callback invocations. If the callback is synchronous, the callback is executed on the caller's thread of control. If you put a break point inside the callback, the stack frame will show the publisher function call and the publisher callback all synchronously invoked. There are no multithreaded issues with this scenario as everything is running on a single thread.

If the publisher code has its own thread, it may invoke the callback function on its thread of control and not the subscriber's thread. A publisher invoked callback can occur at any time completely independent of the subscriber’s thread of control. This cross-threading can cause problems for the subscriber if the callback code is not thread-safe since you now have another thread calling into subscriber code base at some unknown interval.

One solution for making a callback function thread-safe is to post a message to the subscriber's OS queue during the publisher's callback. The subscriber's thread later dequeues the message and calls an appropriate function. Since the callback implementation only posts a message, the callback, even if done asynchronously, is thread-safe. In this case, the asynchrony of a message queue provides the thread safety in lieu of software locks.

Callbacks are typically free functions, either a class static member or a global function. In C++, instance member functions are handled differently and have significant limitations when it comes to member function pointers. I won't go into all the sorted details, the topic has been covered endlessly elsewhere, but suffice to say you can't have a single pointer point to all function types. This framework supports calling free functions, but offers support to get the call back onto an instance member function.

Using the Code

I'll first present how to use the code, and then get into the implementation details.

A publisher uses the AsycCallback<> class to expose a callback interface to potential subscribers. An instance is created with one template argument – the user data type for function callback argument. In the example below, an int will become the callback function argument.

C++
AsyncCallback<int> callback;

To subscribe to callback, create a free function (static member or global) as shown. I’ll explain why the <int> argument requires a (const int&, void*) function signature shortly.

C++
void SimpleCallback(const int& value, void* userData)
{
    cout << "SimpleCallback " << value << endl;
}

The subscriber registers to receive callbacks using the Register() function. The first argument is a pointer to the callback function. The second argument is a pointer to a thread the callback is to be invoked on.

C++
callback.Register(&SimpleCallback, &workerThread1);

When the publisher needs to invoke the callback for all registered subscribers, use operator() or Invoke(). Neither function executes the callback synchronously; instead it dispatches each callback onto the destination thread of control.

C++
callback(123);
callback.Invoke(123);

Use Unregister() to unsubscribe a callback.

C++
callback.Unregister(&SimpleCallback, &workerThread1);

Alternatively, to unregister all callbacks use Clear().

C++
callback.Clear();

Always check if anyone is subscribed to the callback before invocation using one of these two methods:

C++
if (callback)
    callback(123);
if (!callback.Empty())
    callback(123);

An AsyncCallback<> is easily used to add asynchrony to both incoming and outgoing API interfaces. The following examples show how.

SysData Example

SysData is a simple class showing how to expose an outgoing asynchronous interface. The class stores system data and provides asynchronous subscriber notifications when the mode changes. The class interface is shown below:

C++
class SysData
{
public:
    /// Clients register with AsyncCallback to get callbacks when system mode changes
    AsyncCallback<SystemModeChanged> SystemModeChangedCallback;

    /// Get singleton instance of this class
    static SysData& GetInstance();

    /// Sets the system mode and notify registered clients via SystemModeChangedCallback.
    /// @param[in] systemMode - the new system mode. 
    void SetSystemMode(SystemMode::Type systemMode);    

private:
    SysData();
    ~SysData();

    /// The current system mode data
    SystemMode::Type m_systemMode;

    /// Lock to make the class thread-safe
    LOCK m_lock;
};

The subscriber interface for receiving callbacks is SystemModeChangedCallback. Calling SetSystemMode() saves the new mode into m_systemMode and notifies all registered subscribers.

C++
void SysData::SetSystemMode(SystemMode::Type systemMode)
{
    LockGuard lockGuard(&m_lock);

    // Create the callback data
    SystemModeChanged callbackData;
    callbackData.PreviousSystemMode = m_systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    m_systemMode = systemMode;

    // Callback all registered subscribers
    if (SystemModeChangedCallback)
        SystemModeChangedCallback(callbackData);
}

SysDataClient Example

SysDataClient is a callback subscriber and registers for notifications within the constructor. Notice the third argument to Register() is a this pointer. The pointer is passed back as userData on each callback. The framework internally does nothing with userData other that pass it back to the callback invocation. The userData value can be anything the caller wants.

C++
// Constructor
SysDataClient() :
    m_numberOfCallbacks(0)
{
    // Register for async callbacks
    SysData::GetInstance().SystemModeChangedCallback.Register(&SysDataClient::CallbackFunction, 
        &workerThread1, this);    
}

SysDataClient::CallbackFunction() is now called when the system mode changes. Note that the userData argument is typecast back to a SysDataClient instance. Since Register() provided a this pointer, the callback function is able to access any object instance or function during execution.

C++
static void CallbackFunction(const SystemModeChanged& data, void* userData)
{
    // The user data pointer originates from the 3rd argument in the Register() function
    // Typecast the void* to SysDataClient* to access object instance data/functions.
    SysDataClient* instance = static_cast<SysDataClient*>(userData);
    instance->m_numberOfCallbacks++;

    cout << "CallbackFunction " << data.CurrentSystemMode << endl;
}

When SetSystemMode() is called, anyone interested in the mode changes are notified asynchronously on their desired execution thread.

C++
// Set new SystemMode values. Each call will invoke callbacks to all 
// registered client subscribers.
SysData::GetInstance().SetSystemMode(SystemMode::STARTING);
SysData::GetInstance().SetSystemMode(SystemMode::NORMAL);

SysDataNoLock Example

SysDataNoLocks is an alternate implementation that uses a private AsyncCallback<> for setting the system mode asynchronously and without locks.

C++
class SysDataNoLock
{
public:
    /// Clients register with AsyncCallback to get callbacks when system mode changes
    AsyncCallback<SystemModeChanged> SystemModeChangedCallback;

    /// Get singleton instance of this class
    static SysDataNoLock& GetInstance();

    /// Sets the system mode and notify registered clients via SystemModeChangedCallback.
    /// @param[in] systemMode - the new system mode. 
    void SetSystemMode(SystemMode::Type systemMode);    

private:
    SysDataNoLock();
    ~SysDataNoLock();

    /// Private callback to get the SetSystemMode call onto a common thread
    AsyncCallback<SystemMode::Type> SetSystemModeCallback;

    /// Sets the system mode and notify registered clients via SystemModeChangedCallback.
    /// @param[in] systemMode - the new system mode. 
    /// @param[in] userData - a 'this' pointer to SysDataNoLock.  
    static void SetSystemModePrivate(const SystemMode::Type& systemMode, void* userData);    

    /// The current system mode data
    SystemMode::Type m_systemMode;
};

The constructor registers SetSystemModePrivate() with the private SetSystemModeCallback.

C++
SysDataNoLock::SysDataNoLock() :
    m_systemMode(SystemMode::STARTING)
{
    SetSystemModeCallback.Register(&SysDataNoLock::SetSystemModePrivate, &workerThread2, this);
    workerThread2.CreateThread();
}

The SetSystemMode() function below is an example of an asynchronous incoming interface. To the caller, it looks like a normal function, but under the hood, a private member call is invoked asynchronously. In this case, invoking SetSystemModeCallback causes SetSystemModePrivate() to be called on workerThread2.

C++
void SysDataNoLock::SetSystemMode(SystemMode::Type systemMode)
{
    // Invoke the private callback. SetSystemModePrivate() will be called on workerThread2.
    SetSystemModeCallback(systemMode);
}

Since this private function is always invoked asynchronously on workerThread2 it doesn't require locks.

C++
void SysDataNoLock::SetSystemModePrivate(const SystemMode::Type& systemMode, void* userData)
{
    SysDataNoLock* instance = static_cast<SysDataNoLock*>(userData);

    // Create the callback data
    SystemModeChanged callbackData;
    callbackData.PreviousSystemMode = instance->m_systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    instance->m_systemMode = systemMode;

    // Callback all registered subscribers
    if (instance->SystemModeChangedCallback)
        instance->SystemModeChangedCallback(callbackData);
}

Timer Example

Once a callback framework is in place, creating a timer callback service is trivial. Many systems need a way to generate a callback based on a timeout. Maybe it's a periodic timeout for some low speed polling, or maybe an error timeout in case something doesn't occur within the expected time frame. Either way, the callback must occur on a specified thread of control. An AsyncCallback<> used inside a Timer class solves this nicely.

C++
class Timer
{
public:
    AsyncCallback<TimerData> Expired;

    void Start(UINT32 timeout);
    void Stop();
    //...
};

Users create an instance of the timer and register for the expiration. In this case, MyCallback() is called in 1000ms.

C++
m_timer.Expired.Register(&MyClass::MyCallback, &myThread, this);
m_timer.Start(1000);

A Timer implementation isn't offered in the examples. However, the article "C++ State Machine with Threads" contains a Timer class that shows a complete multithreaded example of AsyncCallback<> integrated with a C++ state machine.

Callback Signature Limitations

This design has the following limitations imposed on all callback functions:

  1. Each callback handles a single user defined argument type (TData).
  2. The two callback function arguments are always: const TData& and void*.
  3. Each callback has a void return type.

For instance, if an AsyncCallback<> is declared as:

C++
AsyncCallback<MyData> myCallback;

The callback function signature is:

C++
void MyCallback(const MyData& data, void* userData);

The design can be extended to support more than one argument if necessary. However, the design somewhat mimics what embedded programmers do all the time, which is something like:

  1. Dynamically create an instance to a struct or class and populate data.
  2. Post a pointer to the data through an OS message as a void*.
  3. Get the data from the OS message queue and typecast the void* back to the original type.
  4. Delete the dynamically created data.

In this design, the entire infrastructure happens automatically without any additional effort on the programmer's part. If multiple data parameters are required, they must be packaged into a single class/struct and used as the callback data argument.

Implementation

The number of lines of code for the callback framework is surprisingly low. Strip out the comments, and maybe a couple hundred lines of code that are (hopefully) easy to understand and maintain.

AsyncCallback<> and AsyncCallbackBase form the basis for publishing a callback interface. The classes are thread-safe. The base version is non-templatized to reduce code space. AsyncCallbackBase provides the invocation list and thread safety mechanisms.

AsyncCallback::Invoke() iterates over the list and dispatches callback messages to each target thread. The data is dynamically created to travel through an OS message queue.

C++
void Invoke(const TData& data) 
{
    LockGuard lockGuard(GetLock());

    // For each registered callback 
    InvocationNode* node = GetInvocationHead();
    while (node != NULL)
    {
        // Create a new instance of callback and copy
        const Callback* callback = new Callback(*node->CallbackElement);

        // Create a new instance of the callback data and copy
        const TData* callbackData = new TData(data);

        // Create a new message  instance 
        CallbackMsg* msg = new CallbackMsg(this, callback, callbackData);

        // Dispatch message onto the callback destination thread. TargetInvoke()
        // will be called by the target thread. 
        callback->GetCallbackThread()->DispatchCallback(msg);

        // Get the next registered callback subscriber 
        node = node->Next;
    }
}

AsyncCallback::TargetInvoke() is called by target thread to actually execute the callback. Dynamic data is deleted after the callback is invoked.

C++
virtual void TargetInvoke(CallbackMsg** msg) const
{
    const Callback* callback = (*msg)->GetCallback();

    // Typecast the void* back to a TData type
    const TData* callbackData = static_cast<const TData*>((*msg)->GetCallbackData());

    // Typecast a generic callback function pointer to the CallbackFunc type
    CallbackFunc func = reinterpret_cast<CallbackFunc>(callback->GetCallbackFunction());

    // Execute the registered callback function
    (*func)(*callbackData, callback->GetUserData());

    // Delete dynamically data sent through the message queue
    delete callbackData;
    delete callback;
    delete *msg;
    *msg = NULL;
}

Asynchronous callbacks impose certain limitations because everything the callback destination thread needs must be created on the heap, packaged into a class, and placed into an OS message queue.

The insertion into an OS queue is platform specific. The CallbackThread class provides the interface to be implemented on each target platform. See the Porting section below for a more complete discussion.

C++
class CallbackThread
{
public:
    virtual void DispatchCallback(CallbackMsg* msg) = 0;
};

Once the message is placed into the message queue, platform specific code unpacks the message and calls the AsyncCallbackBase::TargetInvoke() function and destroys dynamically allocated data.

C++
unsigned long WorkerThread::Process(void* parameter)
{
    MSG msg;
    BOOL bRet;
    while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
    {
        switch (msg.message)
        {
            case WM_DISPATCH_CALLBACK:
            {
                ASSERT_TRUE(msg.wParam != NULL);

                // Get the ThreadMsg from the wParam value
                ThreadMsg* threadMsg = reinterpret_cast<ThreadMsg*>(msg.wParam);

                // Convert the ThreadMsg void* data back to a CallbackMsg* 
                CallbackMsg* callbackMsg = static_cast<CallbackMsg*>(threadMsg->GetData()); 

                // Invoke the callback callback on the target thread
                callbackMsg->GetAsyncCallback()->TargetInvoke(&callbackMsg);

                // Delete dynamic data passed through message queue
                delete threadMsg;
                break;
            }

            case WM_EXIT_THREAD:
                return 0;

            default:
                ASSERT();
        }
    }
    return 0;
}

Notice the thread loop is unlike most systems that have a huge switch statement handling various incoming data messages, type casting void* data, then calling a specific function. The framework supports all callbacks with a single WM_DISPATCH_CALLBACK message. Once setup, the same small thread loop handles every callback. New publisher and subscribers come and go as the system is designed, but the code in-between doesn't change.

This is a huge benefit as on many systems getting data between threads takes a lot of manual steps. You constantly have to mess with each thread loop, create during sending, destroy data when receiving, and call various OS services and typecasts. Here, you do none of that. All the stuff in-between is neatly handled for users.

Heap

The heap is used to create dynamic data. It stems from using an invocation list and needing to send data objects through the message queue. Remember, your callback data is copied and destroyed during a callback. Most times, the callback data is POD (Plain Old Data Structure). If you have something fancier that can't be bitwise copied, be sure to implement a copy constructor for the callback data.

On some systems, it is undesirable to use the heap. For those situations, I use a fixed block memory allocator. The xallocator implementation solves the dynamic storage issues and is much faster than the global heap. To use, just include xallocator.h and add the macro XALLOCATOR to the class declaration. An entire class hierarchy can use the fixed block allocator by placing XALLOCTOR in the base class.

C++
#include "xallocator.h"

class Callback 
{
    XALLOCATOR
    // ...
};

With xallocator in place, calling operator new or delete allows the fixed block allocator to take over the storage duties. How objects are created and destroyed is exactly the same, only the source of the memory is different. For more information on xallocator, and to get the source code, see the article "Replace malloc/free with a Fast Fixed Block Memory Allocator". The only files needed are Allocator.h/cpp and xallocator.h/cpp.

To use xallocator in the callback framework, place XALLOCATOR macros in the following class definitions:

  • Callback
  • CallbackMsg
  • InvocationNode

For the platform specific files, you also include XALLOCATOR. In this example, these are:

  • ThreadMsg
  • SystemModeChanged

Porting

The code is an easy port to any platform. There are only two OS services required: threads and a software lock. The code is separated into five directories.

  1. AsyncCallback - core framework implementation files
  2. PortWin – Windows-specific files (thread/lock)
  3. Examples – sample code showing usage
  4. VS2008 – Visual Studio 2008 project files
  5. VS2015 – Visual Studio 2015 project files

The library has a single abstract class CallbackThread with a single pure virtual function:

C++
virtual void DispatchCallback(CallbackMsg* msg) = 0;

On most projects, I wrap the underlying raw OS calls into a thread class to encapsulate and enforce the correct behavior. Here, I provide ThreadWin as a wrapper over the CreateThread() Windows API.

Once you have a thread class, just inherit the CallbackThread interface and implement the DispatchCallback() function. On Windows, a simple post to a message queue is all that is required:

C++
void ThreadWin::DispatchCallback(CallbackMsg* msg)
{
    // Create a new ThreadMsg
    ThreadMsg* threadMsg = new ThreadMsg(WM_DISPATCH_CALLBACK, msg);

    // Post the message to the this thread's message queue
    PostThreadMessage(WM_DISPATCH_CALLBACK, threadMsg);
}

The Windows thread loop gets the message and calls the TargetInvoke() function for the incoming instance. The data sent through the queue is deleted once complete.

C++
switch (msg.message)
{
    case WM_DISPATCH_CALLBACK:
    {
        ASSERT_TRUE(msg.wParam != NULL);

        // Get the ThreadMsg from the wParam value
        ThreadMsg* threadMsg = reinterpret_cast<ThreadMsg*>(msg.wParam);

        // Convert the ThreadMsg void* data back to a CallbackMsg* 
        CallbackMsg* callbackMsg = static_cast<CallbackMsg*>(threadMsg->GetData()); 

        // Invoke the callback callback on the target thread
        callbackMsg->GetAsyncCallback()->TargetInvoke(&callbackMsg);

        // Delete dynamic data passed through message queue
        delete threadMsg;
        break;
    }

    case WM_EXIT_THREAD:
        return 0;

    default:
        ASSERT();
}

Software locks are handled by the LockGuard class. This class can be updated with locks of your choice, or you can use a different mechanism. Locks are only used in a few places.

Code Size

To gauge the cost of using this technique, the code was built for an ARM CPU using Keil. If deployed on a project, many AsyncCallback<> instances will be created so it needs to be space efficient.

The incremental code size of for each additional AsyncCallback<>, one subscriber, one registration call, and one callback invocation is around 120 bytes using full optimization. You’d certainly use at least this much code moving data from one thread to another manually.

Which Callback Implementation?

I’ve documented three different asynchronous multicast callback implementations here on CodeProject. Each version has its own unique features and advantages. The sections below highlight the main differences between each solution. See the References section below for links to each article.

Asynchronous Multicast Callbacks in C

  • Implemented in C
  • Callback function is a free or static member only
  • One callback argument supported
  • Callback argument must be a pointer type
  • Callback argument data copied with memcpy
  • Type-safety provided by macros
  • Static array holds registered subscriber callbacks
  • Number of registered subscribers fixed at compile time
  • Fixed block memory allocator in C
  • Compact implementation

Asynchronous Multicast Callbacks with Inter-Thread Messaging

  • Implemented in C++
  • Callback function is a free or static member only
  • One callback argument supported
  • Callback argument must be a pointer type
  • Callback argument data copied with copy constructor
  • Type-safety provided by templates
  • Minimal use of templates
  • Dynamic list of registered subscriber callbacks
  • Number of registered subscribers expands at runtime
  • Fixed block memory allocator in C++
  • Compact implementation

Asynchronous Multicast Delegates in C++

  • Implemented in C++
  • C++ delegate paradigm
  • Any callback function type (member, static, free)
  • Multiple callback arguments supported (up to 5)
  • Callback argument any type (value, reference, pointer, pointer to pointer)
  • Callback argument data copied with copy constructor
  • Type-safety provided by templates
  • Heavy use of templates
  • Dynamic list of registered subscriber callbacks
  • Number of registered subscribers expands at runtime
  • Fixed block memory allocator in C++
  • Larger implementation

References

Conclusion

There are many ways to design a publisher/subscriber callback system. This version incorporates unique features I've never seen before, especially the ease at which asynchronous callbacks are generated onto a client specified thread of control. The implementation was kept to a minimum to facilitate porting to any system embedded or otherwise.

I've used this technique on projects with great success. Each class or subsystem may expose one or more outgoing interfaces with AsyncCallback<> instances. Any code within the system is able to connect and receive asynchronous callbacks with worrying about cross-threading or the machinery to make it all work. A feature like this eases application design and architecturally standardizes inter-thread communication with a well-understood callback paradigm.

History

  • 15th April, 2016
    • Initial release
  • 17th April, 2016
    • Updated article to correct mistakes
  • 18th April, 2016
    • Updated source code to include VS2008 and VS2015 project files to speed evaluation testing
    • Minor article corrections
  • 22nd April, 2016
    • New References section
    • Minor code simplifications with updated source code
    • Update article text
  • 29th April, 2016
    • Eliminated ThreadWin heap usage
    • Updated attached source code
  • 19th November, 2016
    • Minor bug fix with source code
    • Updated Timer and References sections
  • 2nd December, 2016
    • Fix bug in Callback::operator==()
    • Updated attached source code
  • 25th January, 2019
    • Added "Which Callback Implementation?" section
    • Updated References section

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)