Introduction
When the project starts growing, one of the needs that always becomes more and more important, is a way to communicate between components. An Event Aggregator (or Message Bus/Broker) is used for this purpose.
I have been using .NET Event and Prism's Event Aggregator as shown in my article a few years ago. For delegate EventHandler<TEventArgs>
, it's most suitable for small projects (due to tight coupling that the publisher keeps a reference to the subscriber's handler). For Prism's Event Aggregator, while it is great, I would like to implement my own lightweight one that is suitable for my own need.
While there are many different implementations based on different approaches/concerns/complexities, I would like to present a few minimalist implementations, that ignore the threading for now.
Implementations
Suppose we have 3 components: the Publisher
, Subscriber1
, and Subscriber2
. These three are working with the Person
objects. The Publisher would publish a message whenever a new person created/modified/deleted, and the two Subscribers would want to subscribe to receive those messages to react accordingly.
We would introduce a 'messaging' service that is injected into the publishers and subscribers for them to use. Things would work like this:
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
});
messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);
private void OnPersonCreated(PersonCreatedMessage message)
{
}
Implementation #1: Messaging via Action<TMessage> Delegate
Not all kinds of messages the subscribers need to receive, and it's better to explicitly specify what kind of messages we want to subscribe. Those messages are IMessage
:
public interface IMessage
{
string Description { get; }
}
For each subscriber, we can identify a 'receipt' in various forms, such as GUID, unique increasing integer numbers like IDs in the database, or a different way as below. This is to enable the subscribers to un-subscribe receiving the messages.
public interface ISubscription<T> where T : IMessage
{
Action<T> ActionHandler { get; }
}
public class Subscription<T> : ISubscription<T> where T : IMessage
{
public Action<T> ActionHandler { get; private set; }
public Subscription(Action<T> action)
{
ActionHandler = action;
}
}
Our Message Bus is to provide the following functionalities:
public interface IMessageBus
{
void Publish<T>(T message) where T : IMessage;
ISubscription<T> Subscribe<T>(Action<T> actionCallback) where T : IMessage;
bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage;
void ClearAllSubscriptions();
}
And its implementation is explained as below:
public class MessageBus : IMessageBus
{
private readonly Dictionary<Type, List<object>> _observers
= new Dictionary<Type, List<object>>();
}
_observers
here has the key of type IMessage
, and associate value is the list of subscribers' action handler methods that respond upon receiving the messages.
Subscribe
method is implemented as below:
public ISubscription<T> Subscribe<T>(Action<T> callback) where T : IMessage
{
ISubscription<T> subscription = null;
Type messageType = typeof(T);
var subscriptions = _observers.ContainsKey(messageType) ?
_observers[messageType] : new List<object<();
if (!subscriptions
.Select(s => s as ISubscription<T>)
.Any(s => s.ActionHandler == callback))
{
subscription = new Subscription<T>(callback);
subscriptions.Add(subscription);
}
_observers[messageType] = subscriptions;
return subscription;
}
Then the Publish
message:
public void Publish<T>(T message) where T : IMessage
{
if (message == null) throw new ArgumentNullException(nameof(message));
Type messageType = typeof(T);
if (_observers.ContainsKey(messageType))
{
var subscriptions = _observers[messageType];
if (subscriptions == null || subscriptions.Count == 0) return;
foreach (var handler in subscriptions
.Select(s => s as ISubscription<T>)
.Select(s => s.ActionHandler))
{
handler?.Invoke(message);
}
}
}
The UnSubscribe
is implemented as:
public bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage
{
bool removed = false;
if (subscription == null) return false;
Type messageType = typeof(T);
if (_observers.ContainsKey(messageType))
{
removed = _observers[messageType].Remove(subscription);
if (_observers[messageType].Count == 0)
_observers.Remove(messageType);
}
return removed;
}
Finally, we can remove all subscribers by clear the _observers: _observers.Clear();
.
How to Use
First define the message types, for example:
public class PersonCreatedMessage : IMessage
{
public string Description { get; set; }
public Person Person { get; set; }
}
public class PersonDeletedMessage : IMessage
{
public string Description { get; set; }
}
Where our model (Person
) has a property GivenName
.
The subscriber is basically setup as follows:
public class Subscriber1
{
private MessageBus messageBus;
private ISubscription<PersonCreatedMessage> personCreatedSubscription;
public Subscriber1(MessageBus messageBus)
{
this.messageBus = messageBus;
personCreatedSubscription =
this.messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);
}
private void OnPersonCreated(PersonCreatedMessage message)
{
}
private void Unsubscribe()
{
this.messageBus.UnSubscribe(personCreatedSubscription);
}
}
And the publisher:
public class Publisher
{
private MessageBus messageBus;
public Publisher(MessageBus messageBus)
{
this.messageBus = messageBus;
}
public void CreatePerson()
{
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
Person = new Person { GivenName = "John" },
Description = "[Demo 1] A new person has been created."
});
}
}
We can get started by:
MessageBus messageBus = new MessageBus();
Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);
publisher.CreatePerson();
Implementation #2: Messaging via Weak Reference
When the system becomes so big and the messaging system becomes so complex with many publishers and subscribers, it's inconvenient having to remember to un-subscribe every subscription. Personally, I would want to explicitly do it. However, this is my implementation to use weak reference version of the above action delegate version. The purpose is to have the Subscriber be eligible to be Garbage collected when there is no strong reference to it.
In this implementation, we will wrap the action handler in a WeakSubscription
:
public class WeakSubscription<T> where T : IMessage
{
private WeakReference _weakAction;
public WeakSubscription(Action<T> action)
{
_weakAction = new WeakReference(action);
}
public bool IsAlive
{
get { return _weakAction.IsAlive; }
}
public object Target
{
get { return _weakAction.Target; }
}
public void OnMessageReceived(T message)
{
var action = _weakAction.Target as Action<T>;
action?.Invoke(message);
}
}
Since weak reference is used, we won't implement the UnSubscribe
method here. So, our MessageBus
has Publish
and Subscribe
as its two main functionalities:
public class MessageBus
{
private readonly Dictionary<Type, List<object>> _observers
= new Dictionary<Type, List<object>>();
}
public void Subscribe<T>(Action<T> callback) where T : IMessage
{
Type messageType = typeof(T);
var subscriptions = _observers.ContainsKey(messageType) ?
_observers[messageType] : new List<object>();
if (!subscriptions
.Select(s => s as WeakSubscription<T>)
.Any(s => s.Target == new WeakReference(callback).Target))
subscriptions.Add(new WeakSubscription<T>(callback));
_observers[messageType] = subscriptions;
var deadSubscriptionsRemoved = CleanupSubscriptions<T>();
}
public void Publish<T>(T message) where T : IMessage
{
if (message == null) throw new ArgumentNullException(nameof(message));
Type messageType = typeof(T);
if (_observers.ContainsKey(messageType))
{
var subscriptions = _observers[messageType];
List<WeakSubscription<T>> deadSubscriptions = new List<WeakSubscription<T>>();
foreach (var subscription in subscriptions
.Select(s => s as WeakSubscription<T>))
{
if (subscription.IsAlive)
subscription?.OnMessageReceived(message);
else
deadSubscriptions.Add(subscription);
subscriptions.RemoveAll(s => deadSubscriptions.Contains(s));
if (subscriptions.Count == 0)
_observers.Remove(messageType);
}
}
}
And finally, we can clear all subscriptions like this:
private int CleanupSubscriptions<T>() where T : IMessage
{
return _observers[typeof(T)].RemoveAll((s) =>
{
WeakSubscription<T> subRef = s as WeakSubscription<T>;
if (subRef != null)
return !subRef.IsAlive;
return true;
});
}
How to Use
The same with the Implementation #1, except that it's simpler! No need to keep track of subscribers to un-subscribe.
Implementation #3: Messaging via Interface
This is a modification of Glenn Block's implementation of Event Aggregator pattern on PluralSight, that instead of using delegate, the system uses interface as a way to respond when receiving the message. Each subscriber can implement as many ISubscribe
interfaces of different kinds of messages as a way to subscribe to those messages. The interface only declares an action method for the interested subscribers to respond once the message is received:
public interface ISubscribe<T> where T: IMessage
{
void OnMessageReceived(T message);
}
The MessageBus
has been designed to have the following capabilities:
public interface IMessageBus
{
void Publish<T>(T message) where T : IMessage;
int Subscribe(object subscriber, bool reSubscribable = false);
int UnSubscribe(object subscriber);
bool UnSubscribeTo<T>() where T:IMessage;
}
And its implementation is as follows:
public class MessageBus : IMessageBus
{
private readonly Dictionary<Type,
List<WeakReference>> _observers = new Dictionary<Type, List<WeakReference>>();
...
private IEnumerable<Type> GetSubscriberTypes(object subscriber)
{
return subscriber
.GetType()
.GetInterfaces()
.Where(i => i.IsGenericType &&
i.GetGenericTypeDefinition() == typeof(ISubscribe<>));
}
}
public int Subscribe(object subscriber, bool reSubscribable = false)
{
if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));
WeakReference subscriberRef = new WeakReference(subscriber);
var subscriberTypes = GetSubscriberTypes(subscriber);
foreach (var subscriberType in subscriberTypes)
{
if (_observers.ContainsKey(subscriberType))
{
_observers[subscriberType].RemoveAll(s => !s.IsAlive);
if (!_observers[subscriberType].Any
(s => s.Target == subscriberRef.Target) || reSubscribable)
_observers[subscriberType].Add(subscriberRef);
}
else
_observers.Add(subscriberType, new List<WeakReference> { subscriberRef });
}
return subscriberTypes.ToList().Count;
}
Then the Publish
method:
public void Publish<T>(T message) where T: IMessage
{
if (message == null) throw new ArgumentNullException(nameof(message));
var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T));
if (_observers.ContainsKey(subscriberType))
{
List<WeakReference> subscriberRefs = _observers[subscriberType];
List<WeakReference> deadSubscriberRefs=new List<WeakReference>();
foreach (var subscriberRef in subscriberRefs)
{
if (subscriberRef.IsAlive)
{
var subscriber = subscriberRef.Target as ISubscribe<T>;
subscriber?.OnMessageReceived(message);
}
else
deadSubscriberRefs.Add(subscriberRef);
}
subscriberRefs.RemoveAll(s => deadSubscriberRefs.Contains(s));
if (subscriberRefs.Count == 0)
_observers.Remove(subscriberType);
}
}
And we can un-subscribe
either a kind of message, or an object subscriber:
public int UnSubscribe(object subscriber)
{
if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));
var subscriberRef = new WeakReference(subscriber);
var subscriberTypes = GetSubscriberTypes(subscriber);
var emptyKeys = new List<Type>();
int unSubscribedTypeCount = 0;
foreach (var subscriberType in subscriberTypes)
{
if (_observers.ContainsKey(subscriberType))
{
List<WeakReference> subscriberRefs = _observers[subscriberType];
unSubscribedTypeCount += subscriberRefs.RemoveAll(s => s.Target == subscriber);
if (subscriberRefs.Count == 0)
emptyKeys.Add(subscriberType);
}
}
foreach (var key in emptyKeys)
_observers.Remove(key);
return unSubscribedTypeCount;
}
public bool UnSubscribeTo<T>() where T: IMessage
{
var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T));
if (_observers.ContainsKey(subscriberType))
return _observers.Remove(subscriberType);
else
throw new KeyNotFoundException(subscriberType.ToString());
}
How to Use
The subscriber needs to subscribe to the kinds of messages they want to receive via the interfaces, that promise the actions to be executed when the messages are received:
public class Subscriber1 : ISubscribe<PersonCreatedMessage>, ISubscribe<PersonDeletedMessage>
{
private MessageBus messageBus;
public Subscriber1(MessageBus messageBus)
{
this.messageBus = messageBus;
var subscription = this.messageBus.Subscribe(this);
}
public void OnMessageReceived(PersonCreatedMessage message)
{
}
public void OnMessageReceived(PersonDeletedMessage message)
{
}
public int UnSubscribe()
{
return this.messageBus.UnSubscribe(this);
}
...
messageBus.UnSubscribeTo<PersonCreatedMessage<Person>>();
}
The publisher is very similar to the Implementation #1 and #2:
public class Publisher
{
private MessageBus messageBus;
public Publisher(MessageBus messageBus)
{
this.messageBus = messageBus;
}
public void CreatePerson()
{
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
Person = new Person { GivenName = "Mike" },
Description = "[Demo 3] A new person has been created."
});
}
public void DeletePerson()
{
this.messageBus.Publish<PersonDeletedMessage>(new PersonDeletedMessage
{
Description = "Person has been deleted."
});
}
}
This is to start:
MessageBus messageBus = new MessageBus();
Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);
publisher.CreatePerson();
publisher.DeletePerson();
History
- 16th September, 2019: Applied IMessage for all applicable message types
- 4th September, 2019: Initial version