Introduction
SW Message Bus represents message flow abstraction mechanism in a complex, multithreaded SW system.
The term "Bus" was chosen because the main idea is very similar to HW Data Bus. Once HW unit puts the data, another HW unit, that is interested in this data, picks it up from the bus.
In case of the SW Message Bus that works by the same way. One SW module puts a message on the bus, another, that is interested in this message picks it up from the bus. Bundling between the Publisher (who puts the message) and the Subscriber (who picks up the message) is done only by Message Type. No Hard Coded, no Run Time, no Configuration registration between Subscriber and Publisher is required. The only shared information between any Publisher and any Subscriber is the type of the message. Type only. Nothing else.
Another big advantage is a strong type checking all the way from the Publisher to Subscriber. No casting. Never.
The Idea
The main idea behind this infrastructure is to use run time type unique identification, for any published message, and to call any subscriber, that subscribed for this message type. In C++, that may be done using typeid
from typeinfo
, but I preferred, simpler and more efficient method, that didn't involve RTTI.
using TypeId = uintptr_t;
template < typename T >
static TypeId GetTypeId()
{
static uint32_t placeHolder;
return (reinterpret_cast<TypeId>(&placeHolder));
}
Once we know to get unique identification of Message type - the rest is simple.
Publisher puts message on a bus, using unique id of the message we know to find (in internal Message Bus repository) all subscribers that are registered to this message type.
That's it.
Subscriber
Who may subscribe for messages?
Any callable target. That may be function, lambda expression, functor or bind expression. Subscriber type is defined as:
template < typename MSG_TYPE > using Subscriber = std::function<void(MSG_TYPE)>;
Publisher
Who may publish messages?
Everybody. No restrictions. Message of any Message type may be published. If nobody subscribed for some type, nobody will get this message, but message may be published in any case.
Message Bus API
API of Message Bus is extremely simple:
template < int BUS_ID = 0 > class MsgBus
{
public:
template < typename MSG_TYPE >
static SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber );
static void UnSubscribe( SubscriberHandle& handle );
template < typename MSG_TYPE >
static void PublishBlocking( const MSG_TYPE& msg );
template < typename MSG_TYPE >
static void PublishAsync( const MSG_TYPE& msg );
static bool IsValidHandle( SubscriberHandle& handle );
private:
static MsgBusRepository msgBusRepository;
private:
MsgBus() = delete;
~MsgBus() = delete;
MsgBus( MsgBus& ) = delete;
MsgBus( MsgBus&& ) = delete;
MsgBus& operator= ( MsgBus& ) = delete;
MsgBus& operator= ( MsgBus&& ) = delete;
};
template < int BUS_ID >
template < typename MSG_TYPE >
SubscriberHandle MsgBus< BUS_ID >::Subscribe( Subscriber< MSG_TYPE > subscriber)
{
return msgBusRepository.Subscribe< MSG_TYPE >( subscriber );
}
template < int BUS_ID >
void MsgBus< BUS_ID >::UnSubscribe( SubscriberHandle& handle )
{
msgBusRepository.UnSubscribe( handle );
}
template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishBlocking( const MSG_TYPE& msg )
{
msgBusRepository.Publish( msg );
}
template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishAsync( const MSG_TYPE& msg )
{
std::async( std::launch::async,
MsgBus< BUS_ID >::PublishBlocking< MSG_TYPE >,
msg
);
}
template < int BUS_ID >
bool MsgBus< BUS_ID >::IsValidHandle( SubscriberHandle& handle )
{
return handle.IsValid();
}
template < int MSG_BUS_NUM >
MsgBusRepository MsgBus< MSG_BUS_NUM >::msgBusRepository;
Example
Following is a simple example of Message Bus usage. Let's define message type and some subscribers:
using namespace std;
struct MSG_TYPE_1
{
int i;
};
void RegularFunctionSubscriber( MSG_TYPE_1 msg )
{
cout<< "FunctionSubscriber " << msg.i << endl;
}
class FunctorSubscriber
{
public:
void operator()( MSG_TYPE_1 msg )
{ cout<< "FunctorSubscriber " << msg.i << endl; }
};
Now we are ready to use our Message Bus:
MSG_TYPE_1 msg1 = { 10 };
FunctorSubscriber functorSubscriber;
SubscriberHandle handle1 = MsgBus<>::Subscribe< MSG_TYPE_1 >( RegularFunctionSubscriber );
SubscriberHandle handle2 = MsgBus<>::Subscribe< MSG_TYPE_1 >( functorSubscriber );
SubscriberHandle handle3 = MsgBus<>::Subscribe< MSG_TYPE_1 >( [](MSG_TYPE_1 msg)
{ cout<< "Lambda Subscriber " << msg.i << endl; } );
MsgBus<>::PublishBlocking( msg1 );
MsgBus<>::PublishAsync( msg1 );
MsgBus<>::UnSubscribe( handle1 );
MsgBus<>::UnSubscribe( handle2 );
MsgBus<>::UnSubscribe( handle3 );
Implementation
As I described above, Message Bus keeps all registered subscribers in internal repository, and knows to call them based on Message Type, when message is published.
Non typed template parameter, that MsgBus
receives (with a zero as a default), gives us possibility to create multiple Message Buses.
Message Bus repository is implemented as a map, where index is message id, and content is another map, that contains all subscribers for the specific message type.
#ifndef MSGBUSREPOSITORY_H_
#define MSGBUSREPOSITORY_H_
#include <map>
#include "../API/MessageBusDefs.h"
#include "Infra/TypeId.h"
#include "Infra/SharedMutex.h"
#include "Infra/SubscriberHandle.h"
#include "MsgTypeContainer.h"
class MsgBusRepository
{
public:
MsgBusRepository() : operational( true ) {}
~MsgBusRepository()
{
mutex.LockExclusive();
for (auto iter: repositoryMap )
{
delete iter.second;
}
operational = false;
mutex.UnlockExclusive();
}
template < typename MSG_TYPE >
SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber )
{
TypeId typeId = GetTypeId< MSG_TYPE >();
mutex.LockExclusive();
SubscriberHandleTyped< MSG_TYPE > handle;
if ( operational )
{
auto ret = repositoryMap.insert(
MsgBusRepositoryMapPair( typeId, nullptr ) );
if ( ret.second == true )
{
ret.first->second = new MsgTypeContainer< MSG_TYPE >;
}
MsgTypeContainer< MSG_TYPE >*
container = static_cast<MsgTypeContainer< MSG_TYPE >*>(ret.first->second);
container->Add( handle, subscriber);
}
else
{
handle.SetInvalid();
}
mutex.UnlockExclusive();
return handle;
}
void UnSubscribe( SubscriberHandle& handle )
{
mutex.LockExclusive();
if( operational && handle.IsValid() )
{
TypeId typeId = handle.GetTypeid();
auto iter = repositoryMap.find( typeId );
if ( iter != repositoryMap.end() )
{
MsgTypeContainerBase* container =iter->second;
container->Remove( handle );
if( container->Empty() )
{
repositoryMap.erase( iter );
delete container;
}
}
}
handle.SetInvalid();
mutex.UnlockExclusive();
}
template < typename MSG_TYPE > void Publish( const MSG_TYPE& msg )
{
TypeId typeId = GetTypeId< MSG_TYPE >();
mutex.LockShared();
if( operational )
{
auto iter = repositoryMap.find( typeId );
if ( iter != repositoryMap.end() )
{
MsgTypeContainer< MSG_TYPE >*
container = static_cast< MsgTypeContainer< MSG_TYPE >* >(iter->second);
container->Publish( msg );
}
}
mutex.UnlockShared();
}
MsgBusRepository( MsgBusRepository& ) = delete;
MsgBusRepository( MsgBusRepository&& ) = delete;
MsgBusRepository& operator= ( MsgBusRepository& ) = delete;
MsgBusRepository& operator= ( MsgBusRepository&& ) = delete;
private:
using MsgBusRepositoryMap = std::map< TypeId, MsgTypeContainerBase* >;
using MsgBusRepositoryMapPair = std::pair< TypeId, MsgTypeContainerBase* >;
bool operational;
MsgBusRepositoryMap repositoryMap;
SharedMutex mutex;
};
#endif /* MSGBUSREPOSITORY_H_ */
The last snippet is implementation of the subscribers' container, per specific message type.
#ifndef MSGTYPECONTAINER_H_
#define MSGTYPECONTAINER_H_
#include <map>
#include "../API/MessageBusDefs.h"
#include "Infra/SubscriberHandle.h"
class MsgTypeContainerBase
{
public:
MsgTypeContainerBase() = default;
virtual ~MsgTypeContainerBase() = default;
MsgTypeContainerBase( MsgTypeContainerBase& ) = delete;
MsgTypeContainerBase( MsgTypeContainerBase&& ) = delete;
MsgTypeContainerBase& operator= ( MsgTypeContainerBase& ) = delete;
MsgTypeContainerBase& operator= ( MsgTypeContainerBase&& ) = delete;
virtual void Remove( SubscriberHandle handle ) = 0;
virtual bool Empty() = 0;
};
template < typename MSG_TYPE >
class MsgTypeContainer : public MsgTypeContainerBase
{
public:
void Add( SubscriberHandle handle, Subscriber< MSG_TYPE > subscriber )
{
containerMap.insert( MsgBusContainerMapPair( handle, subscriber ) );
}
void Remove( SubscriberHandle handle )
{
containerMap.erase( handle );
}
bool Empty()
{
return containerMap.empty();
}
void Publish( const MSG_TYPE& msg )
{
for (auto& iter: containerMap )
{
iter.second( msg );
}
}
MsgTypeContainer() = default;
virtual ~MsgTypeContainer() noexcept = default;
MsgTypeContainer( MsgTypeContainer& ) = delete;
MsgTypeContainer( MsgTypeContainer&& ) = delete;
MsgTypeContainer& operator= ( MsgTypeContainer& ) = delete;
MsgTypeContainer& operator= ( MsgTypeContainer&& ) = delete;
private:
using MsgBusContainerMap = std::map< SubscriberHandle,
Subscriber< MSG_TYPE >
>;
using MsgBusContainerMapPair = std::pair< SubscriberHandle,
Subscriber< MSG_TYPE >
>;
MsgBusContainerMap containerMap;
};
#endif /* MSGTYPECONTAINER_H_ */
Hidden Details
I didn't overview in this article two generic infrastructure implementations:
- Counting Semaphore that for some reason is not a part of C++11
- Multiple Readers - Single Writer Semaphore that is planned to be in C++14, as a
std::shared_lock
For details, please look at the source code.
OS Dependency
Message Bus is absolutely OS independent.
Documentation
Message Bus has a full DoxyGen generated documentation.
Please look at Documentation.html.
Compiler Support
Message Bus was tested using gcc 4.8.1 and VS2013 compilers.
For gcc 4.8.1 compiler: compiler flag -std=c++11 and linker flag -pthread have to be added.