Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / web / HTML

SW Message Bus

4.74/5 (21 votes)
10 Jun 2014CPOL3 min read 59.6K   842  
SW Message Bus represents message flow abstraction mechanism in a complex, multithreaded SW system.

Introduction

SW Message Bus represents message flow abstraction mechanism in a complex, multithreaded SW system.

The term "Bus" was chosen because the main idea is very similar to HW Data Bus. Once HW unit puts the data, another HW unit, that is interested in this data, picks it up from the bus.

In case of the SW Message Bus that works by the same way. One SW module puts a message on the bus, another, that is interested in this message picks it up from the bus. Bundling between the Publisher (who puts the message) and the Subscriber (who picks up the message) is done only by Message Type. No Hard Coded, no Run Time, no Configuration registration between Subscriber and Publisher is required. The only shared information between any Publisher and any Subscriber is the type of the message. Type only. Nothing else.

Another big advantage is a strong type checking all the way from the Publisher to Subscriber. No casting. Never.

The Idea

The main idea behind this infrastructure is to use run time type unique identification, for any published message, and to call any subscriber, that subscribed for this message type. In C++, that may be done using typeid from typeinfo, but I preferred, simpler and more efficient method, that didn't involve RTTI.

C++
using TypeId = uintptr_t; 

template < typename T >
static TypeId GetTypeId()
{
  static uint32_t placeHolder;
  return (reinterpret_cast<TypeId>(&placeHolder));
} 

Once we know to get unique identification of Message type - the rest is simple.

Publisher puts message on a bus, using unique id of the message we know to find (in internal Message Bus repository) all subscribers that are registered to this message type.

That's it.

Subscriber

Who may subscribe for messages?

Any callable target. That may be function, lambda expression, functor or bind expression. Subscriber type is defined as:

C++
template < typename MSG_TYPE > using Subscriber = std::function<void(MSG_TYPE)>;  

Publisher

Who may publish messages?

Everybody. No restrictions. Message of any Message type may be published. If nobody subscribed for some type, nobody will get this message, but message may be published in any case.

Message Bus API

API of Message Bus is extremely simple:

C++
template < int BUS_ID = 0 > class MsgBus
{
public:

  /*!***************************************************************************
  * @brief   Subscribe for receiving messages of the specific Message Type.
  *
  * @tparam  MSG_TYPE     Type for which new subscriber will be added.
  *
  * @param   subscriber   Callable target.
  *
  * @return  Handle associated with a registered subscriber. Use IsValidHandle()
  *          for operation success checking.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  static SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber );

  /*!***************************************************************************
  * @brief   UnSubscribe from receiving messages of the specific Message Type.
  *
  * @param   handle      Subscriber handle.
  *
  *****************************************************************************/

  static void UnSubscribe( SubscriberHandle& handle );

  /*!***************************************************************************
  * @brief   Publish message by blocking call. The method will return only
  *          when all subscribers  will receive published message.
  *
  * @tparam  MSG_TYPE    Message type - optional, will be deducted by compiler.
  *
  * @param   msg         Message to be published.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  static void PublishBlocking( const MSG_TYPE& msg );

  /*!***************************************************************************
  * @brief   Publish message by asynchronous call. The method will return
  *          immediately, the message will be delivered asynchronously.
  *
  * @tparam  MSG_TYPE    Message type - optional, will be deducted by compiler.
  *
  * @param   msg         Message to be published.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  static void PublishAsync( const MSG_TYPE& msg );

  /*!***************************************************************************
  * @brief   Check Subscriber handle validity.
  *
  * @param   handle      Subscriber handle.
  *
  * @return  true - valid handle, false else.
  *
  *****************************************************************************/

  static bool IsValidHandle( SubscriberHandle& handle );

private:

 static MsgBusRepository msgBusRepository;

private:

  /// Instantiation, coping, moving and deleting of MsgBus class is prohibited.

  MsgBus() = delete;
  ~MsgBus() = delete;
  MsgBus( MsgBus& ) = delete;
  MsgBus( MsgBus&& ) = delete;
  MsgBus& operator= ( MsgBus& ) = delete;
  MsgBus& operator= ( MsgBus&& ) = delete;
};

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
template < typename MSG_TYPE >
SubscriberHandle MsgBus< BUS_ID >::Subscribe( Subscriber< MSG_TYPE > subscriber)
{
  return msgBusRepository.Subscribe< MSG_TYPE >( subscriber );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
void MsgBus< BUS_ID >::UnSubscribe( SubscriberHandle& handle )
{
  msgBusRepository.UnSubscribe( handle );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishBlocking( const MSG_TYPE& msg )
{
  msgBusRepository.Publish( msg );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishAsync( const MSG_TYPE& msg )
{
  std::async( std::launch::async,
              MsgBus< BUS_ID >::PublishBlocking< MSG_TYPE >,
              msg
             );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
bool MsgBus< BUS_ID >::IsValidHandle( SubscriberHandle& handle )
{
  return handle.IsValid();
}

////////////////////////////////////////////////////////////////////////////////

template < int MSG_BUS_NUM >
MsgBusRepository MsgBus< MSG_BUS_NUM >::msgBusRepository;

Example

Following is a simple example of Message Bus usage. Let's define message type and some subscribers:

C++
using namespace std;

struct MSG_TYPE_1
{
    int i;
};

void RegularFunctionSubscriber( MSG_TYPE_1 msg )
{
    cout<< "FunctionSubscriber " << msg.i << endl;
}

class FunctorSubscriber
{
public:
    void operator()( MSG_TYPE_1 msg ) 
    { cout<< "FunctorSubscriber " << msg.i << endl; }
}; 

Now we are ready to use our Message Bus:

C++
MSG_TYPE_1 msg1 = { 10 };

FunctorSubscriber functorSubscriber;

// Regular Function Subscriber
SubscriberHandle handle1 = MsgBus<>::Subscribe< MSG_TYPE_1 >( RegularFunctionSubscriber );

// Functor Subscriber
SubscriberHandle handle2 = MsgBus<>::Subscribe< MSG_TYPE_1 >( functorSubscriber );

// Lambda Function Subscriber
SubscriberHandle handle3 = MsgBus<>::Subscribe< MSG_TYPE_1 >( [](MSG_TYPE_1 msg) 
{ cout<< "Lambda Subscriber " << msg.i << endl; } );

MsgBus<>::PublishBlocking( msg1 );

MsgBus<>::PublishAsync( msg1 );

MsgBus<>::UnSubscribe( handle1 );

MsgBus<>::UnSubscribe( handle2 );

MsgBus<>::UnSubscribe( handle3 );

Implementation

As I described above, Message Bus keeps all registered subscribers in internal repository, and knows to call them based on Message Type, when message is published.

Non typed template parameter, that MsgBus receives (with a zero as a default), gives us possibility to create multiple Message Buses.

Message Bus repository is implemented as a map, where index is message id, and content is another map, that contains all subscribers for the specific message type.

C++
/*!*****************************************************************************
* @file   MsgBusRepository.h
*
* @brief  Repository of all callable targets for specific bus.
*
* @author Evgeny Zavalkovsky
*
* @date   February 2014
*******************************************************************************/

#ifndef MSGBUSREPOSITORY_H_
#define MSGBUSREPOSITORY_H_

#include <map>

#include "../API/MessageBusDefs.h"
#include "Infra/TypeId.h"
#include "Infra/SharedMutex.h"
#include "Infra/SubscriberHandle.h"
#include "MsgTypeContainer.h"

/*!*****************************************************************************
* @class MsgBusRepository
*
* @brief Repository of all callable targets for specific bus.
*
*******************************************************************************/

class MsgBusRepository
{
public:

  /*!***************************************************************************
  *
  * @brief Constructor.
  *
  *****************************************************************************/

  MsgBusRepository()  : operational( true ) {}

  /*!***************************************************************************
  *
  * @brief Destructor.
  *
  *****************************************************************************/
  ~MsgBusRepository()
  {
    mutex.LockExclusive();

    for (auto iter: repositoryMap )
    {
      delete iter.second;
    }

    operational = false;

    mutex.UnlockExclusive();
  }

  /*!***************************************************************************
  *
  * @brief Subscribe.
  *        Add new Subscriber to the repository.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber )
  {
    TypeId typeId = GetTypeId< MSG_TYPE >();

    mutex.LockExclusive();

    SubscriberHandleTyped< MSG_TYPE > handle;

    if ( operational )
    {
      auto ret = repositoryMap.insert(
                              MsgBusRepositoryMapPair( typeId, nullptr ) );

      /// Check if this is the first subscriber for the MSG_TYPE.
      if ( ret.second == true )
      {
        ret.first->second = new MsgTypeContainer< MSG_TYPE >;
      }

      MsgTypeContainer< MSG_TYPE >*
      container = static_cast<MsgTypeContainer< MSG_TYPE >*>(ret.first->second);

      /// Add subscriber to the container.
      container->Add( handle, subscriber);
    }

    else
    {
      handle.SetInvalid();
    }

    mutex.UnlockExclusive();

    return handle;
  }

  /*!***************************************************************************
  *
  * @brief UnSubscribe.
  *        Remove subscriber from repository.
  *
  *****************************************************************************/

  void UnSubscribe( SubscriberHandle& handle )
  {
    mutex.LockExclusive();

    if( operational && handle.IsValid() )
    {
      TypeId typeId = handle.GetTypeid();

      auto iter = repositoryMap.find( typeId );

      if ( iter != repositoryMap.end() )
      {
        MsgTypeContainerBase* container =iter->second;

        container->Remove( handle );

        /// Check if this is the last subscriber in the container
        if( container->Empty() )
        {
          repositoryMap.erase( iter );

          delete container;
        }
      }
    }

    handle.SetInvalid();

    mutex.UnlockExclusive();
  }

  /*!***************************************************************************
  *
  * @brief Publish.
  *        Publish message for all subscribers for MSG_TYPE.
  *
  *****************************************************************************/

  template < typename MSG_TYPE > void Publish( const MSG_TYPE& msg )
  {
    TypeId typeId = GetTypeId< MSG_TYPE >();

    mutex.LockShared();

    if( operational )
    {
      auto iter = repositoryMap.find( typeId );

      if ( iter != repositoryMap.end() )
      {
        MsgTypeContainer< MSG_TYPE >*
        container = static_cast< MsgTypeContainer< MSG_TYPE >* >(iter->second);

        container->Publish( msg );
      }
    }

    mutex.UnlockShared();
  }

  /// Disable coping and moving.

  MsgBusRepository( MsgBusRepository& )  = delete;
  MsgBusRepository( MsgBusRepository&& ) = delete;
  MsgBusRepository& operator= ( MsgBusRepository& ) = delete;
  MsgBusRepository& operator= ( MsgBusRepository&& ) = delete;

private:

  using MsgBusRepositoryMap     = std::map< TypeId, MsgTypeContainerBase* >;
  using MsgBusRepositoryMapPair = std::pair< TypeId, MsgTypeContainerBase* >;

  bool operational;
  MsgBusRepositoryMap repositoryMap;

  /// Multiple Readers - Single Writer Lock.
  SharedMutex         mutex;
};

#endif /* MSGBUSREPOSITORY_H_ */

The last snippet is implementation of the subscribers' container, per specific message type.

C++
/*!*****************************************************************************
* @file   MsgTypeContainer.h
*
* @brief  Holds all callable targets of the specific MSG_TYPE.
*
* @author Evgeny Zavalkovsky
*
* @date   February 2014
*******************************************************************************/

#ifndef MSGTYPECONTAINER_H_
#define MSGTYPECONTAINER_H_

#include <map>

#include "../API/MessageBusDefs.h"
#include "Infra/SubscriberHandle.h"

/*!*****************************************************************************
* @class MsgTypeContainerBase
*
* @brief Non template base of MsgTypeContainer class
*        Required for omitting template parameter dependency
*        in MsgTypeContainer class
*
*******************************************************************************/

class MsgTypeContainerBase
{
public:

  MsgTypeContainerBase() = default;
  virtual ~MsgTypeContainerBase() = default;
  MsgTypeContainerBase( MsgTypeContainerBase& ) = delete;
  MsgTypeContainerBase( MsgTypeContainerBase&& ) = delete;
  MsgTypeContainerBase& operator= ( MsgTypeContainerBase& ) = delete;
  MsgTypeContainerBase& operator= ( MsgTypeContainerBase&& ) = delete;

  virtual void Remove( SubscriberHandle handle ) = 0;
  virtual bool Empty() = 0;
};

/*!*****************************************************************************
* @class MsgTypeContainer
*
* @brief Holds all callable targets of the specific MSG_TYPE
*
*******************************************************************************/

template < typename MSG_TYPE >
class MsgTypeContainer : public MsgTypeContainerBase
{
public:

  /*!***************************************************************************
  *
  * @brief Add.
  *        Add new callable target.
  *
  *****************************************************************************/

  void Add( SubscriberHandle handle, Subscriber< MSG_TYPE > subscriber  )
  {
    containerMap.insert( MsgBusContainerMapPair( handle, subscriber ) );
  }

  /*!***************************************************************************
  *
  * @brief Remove.
  *        Remove callable target.
  *
  *****************************************************************************/

  void Remove( SubscriberHandle handle )
  {
    containerMap.erase( handle );
  }

  /*!***************************************************************************
  *
  * @brief Empty.
  *        Check if container is empty.
  *
  *****************************************************************************/

  bool Empty()
  {
    return containerMap.empty();
  }

  /*!***************************************************************************
  *
  * @brief Publish.
  *        Publish message to all targets in conatiner.
  *
  *****************************************************************************/
  void Publish( const MSG_TYPE& msg )
  {
    for (auto& iter: containerMap )
    {
      iter.second( msg );
    }
  }

  /// Default Constructor and Destructor
  //  Deleted Move and Copy Constructors and Assign Operators

  MsgTypeContainer() = default;
  virtual ~MsgTypeContainer() noexcept = default;
  MsgTypeContainer( MsgTypeContainer& ) = delete;
  MsgTypeContainer( MsgTypeContainer&& ) = delete;
  MsgTypeContainer& operator= ( MsgTypeContainer& ) = delete;
  MsgTypeContainer& operator= ( MsgTypeContainer&& ) = delete;

private:

  using MsgBusContainerMap = std::map< SubscriberHandle,
                                       Subscriber< MSG_TYPE >
                                     >;

  using MsgBusContainerMapPair = std::pair< SubscriberHandle,
                                            Subscriber< MSG_TYPE >
                                          >;

  MsgBusContainerMap containerMap;
};

#endif /* MSGTYPECONTAINER_H_ */

Hidden Details

I didn't overview in this article two generic infrastructure implementations:

  • Counting Semaphore that for some reason is not a part of C++11
  • Multiple Readers - Single Writer Semaphore that is planned to be in C++14, as a std::shared_lock

For details, please look at the source code.

OS Dependency

Message Bus is absolutely OS independent.

Documentation

Message Bus has a full DoxyGen generated documentation.

Please look at Documentation.html.

Compiler Support

Message Bus was tested using gcc 4.8.1 and VS2013 compilers.

For gcc 4.8.1 compiler: compiler flag -std=c++11 and linker flag -pthread have to be added.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)