Introduction
This article introduces global service bus architecture meant to unify various transport and service bus providers into an easy to consume interface that abstracts transport implementation from message publishing and subscribing.
The architecture is meant to handle common enterprise development challenges such as:
- Different messaging requirements from different teams.
- Legacy code using various technologies.
- Reduction of time constraints by allowing incremental refactoring.
The architecture is geared towards broadcasted messages but can be changed to accommodate other pub/sub strategies.
Background
Before reading this article, you should become familiar with:
- IOC development
- NuGet
- Pub/sub
- Reactive Extensions (Rx)
The Provider Pattern
The code uses a type of aggregate pattern I refer to as a provider pattern. You create an interface that provides a particular functionality, then there's a class which controls how the providers are aggregated and any interactions between them. It's very useful in modular development because individual modules often provide functionality and access to objects the application needs to handle as a whole.
The provider interface is below:
[Flags]
public enum MessageScope
{
Unknown = 0,
Local = 1,
Global = 2
}
public interface IMessageBusProvider
{
MessageScope Scope { get; }
IObservable<T> Listen<T>();
void Send<T>(T message);
}
The functionality being provided is the ability to listen to or send messages. I included a scope to help determine message processing. Local and global are very important because:
- Some service buses are only capable to send within the current application (Local).
- Global messages can be very costly to receive and send, so you don't want to do it needlessly.
- Not everything is meant to be broadcast everywhere.
Scope can be expanded to cover particular topics, departments, etc. You could alternatively do additional filtering within the local providers.
In this particular case, the providers are service bus or transport technologies. Examples of global service bus technologies are:
- Windows Service Bus
MassTransit
NServiceBus
- TIBCO
Examples of local service bus technologies are:
EventAggregator
ReactiveUI.MessageBus
NOTE: Some global service bus technologies have options to send messages locally only too.
In global enterprise application development, you may have various messaging needs in on one team or in one module and completely different needs on another. For example, at my last company where this was inspired, one team had a throughput issue with Windows Service Bus; however, my team had time and resource issues which made Windows Service Bus the optimal choice. We were open to using a different technology later so we also wanted to be able to swap seamlessly if the other team was able to get everything implemented on their schedule.
On top of allowing multiple providers to be implemented concurrently, the architecture also allows you to hot swap any provider. If you implement the provider in a module, all you would have to do is suck it into your catalog, or not. The dependencies for the other consuming modules would be on the aggregator class so they'd be unaffected.
PollingProvider
If you already have a service bus implementation changing to a unified service bus architecture might not be a big deal, but on the flip side if you're currently in polling based application, you might have a lot of work on your hands. The great news is if this is the case, you can plug into the architecture incrementally with the PollingProvider
.
public abstract class PollingProvider : IMessageBusProvider
{
public PollingProvider(MessageScope scope, TimeSpan pollingInterval)
{
Scope = scope;
PollingInterval = pollingInterval;
}
public MessageScope Scope { get; protected set; }
public TimeSpan PollingInterval { get; set; }
protected abstract T Poll<T>();
public IObservable<T> Listen<T>()
{
return Observable.Interval(PollingInterval).Select(_ => Poll<T>());
}
public abstract void Send<T>(T message);
}
In this case, the Poll<T>()
may be some sort of database or service call. Say for example, I'm polling a database every minute to retrieve User updates. Poll<User>()
would call whatever I needed to in order to grab those update users client side. If the client sent an UpdateUsersMessage
, then the send would extract the updated user information and make the appropriate calls to persist the changes.
Other clients would be polling too and get those updates on the next poll. When it was time to implement the full service bus implementation service side, you can just remove the PollingProvider
and consumers won't be impacted because they're attached to the MessageBus
and not the provider.
Additionally, you might have a case not to switch a particular polling function with a service bus oriented function. You still want to be able to have a unified processing approach though and this does it.
MessageBus
So here is an example class to do the aggregation:
public class MessageBus
{
public MessageBus()
{
}
private List<IMessageBusProvider> _providers;
protected List<IMessageBusProvider> Providers
{
get
{
if (_providers == null)
{
_providers = new List<IMessageBusProvider>();
}
return _providers;
}
}
public IObservable<T> Listen<T>(MessageScope scope)
{
return Observable.Merge(
Providers.Where(p => (p.Scope & scope) == scope).Select(p => p.Listen<T>()));
}
public void Send<T>(T message, MessageScope scope)
{
Providers.Where(p => (p.Scope & scope) == scope)
.ToList()
.ForEach(p => p.Send(message));
}
}
Depending on if you're using IOC and which technology you're using, the construction will be slightly different. If you have a choice in the matter, I prefer MEF because it completely abstracts the provider from the aggregator. There aren't a whole lot of things that MEF does you can't do (or workaround) in Unity and vice versa, but MEF contains the ability to export multiple classes of the same interface.
In MEF, you would add the following field:
[ImportMany]
private IEnumerable<Lazy<IMessageBusProvider>> _importedProviders;
Then modify the Providers
property to:
private List<IMessageBusProvider> _providers;
protected List<IMessageBusProvider> Providers
{
get
{
if (_providers == null)
{
_providers = _importedProviders.Select(p => p.Value).ToList();
}
return _providers;
}
}
If you wanted to use Unity or an IOC container that doesn't allow multiple implementations of the same interface, or no container at all, you would add a registration method:
public void RegisterProvider(IMessageBusProvider provider)
{
Providers.Add(provider);
}
During the module initialization in this case, you would simply register the provider(s) the module implemented. I'm not as fond of this approach because it means that my provider has to be aware of what's consuming it. While in the case of the MessageBus
, this should just be one object; however, in other scenarios you might want to implement the provider you could have many provider consumers.
IObservable
The IObservable
interface was introduced with .NET 4.0 and the libraries that support it are in the Reactive Extensions (Rx) libraries. You'll need to install the "Reactive Extensions - Main Library" NuGet package. The Observable
class is in the System.Reactive.Linq
namespace.
If you're not familiar with the Rx libraries, I highly recommend you invest some time into it. The reason why this implementation uses it is the richness and usefulness of those libraries.
Because listening to global messages takes a lot more resources than local subscriptions, you want to try to keep global subscribers to one subscription per message type per application. I often have IObservables
in dedicated business logic to broadcast messages locally such as below:
public class WidgetService
{
public WidgetService(MessageBus messageBus)
{
messageBus.Listen<WidgetMessage>(MessageScope.Global).Subscribe(m =>
{
List<Widget> updatedWidgets = ApplyWidgetChanges(m);
_widgetChanged.OnNext(updatedWidgets);
}
}
private Subject<List<Widget>> _widgetsChanged = new Subject<List<Widget>>();
public IObservable<List<Widget>> WidgetsChanged
{
get
{
return _widgetsChanged;
}
}
}
Although I could republish a global message locally, I believe having the local broadcast as part of a business service helps consolidate the business responsibilities.
Subjects are found in System.Reactive.Subjects
namespace and allow you to observe and push data. There are several useful Subject
implementations and features, but the basic Subject
used here is simply pushing data.
I normally do change updates with a List<T>
because depending on what's hooked up to the subscriptions, it could be very inefficient to update singularly. Obviously, you can send a List<T>
of one if you still wanted to process/send items singularly.
Provider Implementations
There are several common challenges implementing providers with this interface. They mainly revolve around access to the transport resources and message objects.
NOTE: The code below is meant to be part of an abstract
, base provider class. Concrete implementations may need to be particular to message types, topics, subjects, etc. Most importantly, the concrete implementations will need to determine how to handle errors.
The first challenge is ensuring singular application access per message. In order to achieve that, you use a similar approach to the WidgetService
.
protected Subject<ProviderMessageType> _receiver = new Subject<ProviderMessageType>();
The two providers I've really worked with send messages in their own custom class, so essentially the message you're sending is wrapped. In the Windows Service Bus, it uses a BrokeredMessage
and in TIBCO, it's plainly a Message
. The ProviderMessageType
is simply whatever that message type is.
When a message consumer subscribes to a provider, it is actually going to be subscribing to the _receiver
object. Here's an example implementation for a TIBCO provider:
public IObservable<T> Listen<T>()
{
return _receiver
.ObserveOn(new NewThreadScheduler())
.Select(m => CloneMessage(m))
.Where(m => m.GetField("MessageType") == typeof(T).ToString())
.Select(m =>
{
try
{
if (m.GetField("MessageBody") != null)
{
try
{
var memoryStream = new MemoryStream((byte[])m.GetField("MessageBody"));
var formatter = new BinaryFormatter();
return (T)formatter.Deserialize(memoryStream);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
else
{
var body = (T)(typeof(T).GetConstructor(new Type[0]).Invoke(new object[0]));
foreach (var propertyInfo in typeof(T).GetProperties())
{
try
{
propertyInfo.SetValue(body, m.GetField(propertyInfo.Name).Value);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
return body;
}
}
catch (Exception ex)
{
HandleReceiveException(m, ex);
}
return default(T);
});
}
I'll talk about the details line by line.
.ObserveOn(new NewThreadScheduler())
As you might assume (or already know), this ensures that any subscriber is going to be on a new thread. The last thing you want to do is have 100 subscribers all blocking on each other because they ended up on the same thread.
.Select(m => CloneMessage(m))
TIBCO messages aren't thread safe, so I clone the message to ensure any subscription is working with their own instance of the message. Windows Service Bus has a similar issue in that the message body can only be retrieved once; however, you might have multiple subscriptions to the receiver. To deal with that particular issue, I cached the message body by MessageId
so subsequent receivers didn't attempt to extract it again.
The point is you may have some custom work to do around this issue.
.Where(m => m.GetField("MessageType") == typeof(T).ToString())
When processing a message, you don't want to throw exceptions deserializing left and right. The trick is to include the type in the metadata somewhere. Simply compare the metadata information to the generic type before continuing.
if (m.GetField("MessageBody") != null)
{
try
{
var memoryStream = new MemoryStream((byte[])m.GetField("MessageBody"));
var formatter = new BinaryFormatter();
return (T)formatter.Deserialize(memoryStream);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
In TIBCO, you can add primitive fields out of the box, but if you need custom serialization, this is an easy way to do it. Obviously, you'll have to add serialization support for the message type if you want to use this approach.
else
{
var body = (T)(typeof(T).GetConstructor(new Type[0]).Invoke(new object[0]));
foreach (var propertyInfo in typeof(T).GetProperties())
{
try
{
propertyInfo.SetValue(body, m.GetField(propertyInfo.Name).Value);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
return body;
}
If a MessageBody
field wasn't included, it tries to construct a default object, then populate the field in a one to one relationship. The benefits to this approach is that you don't need to implement serialization on the object, the message data is readable, and other platforms that can receive TIBCO messages could process this. The limitation is that all the fields have to be in formats that TIBCO can handle out of the box.
How you send your messages is obviously going to depend on who is going to be consuming them. These are two options that provide generic functionality.
catch (Exception ex)
{
HandleReceiveException(m, ex);
}
This is a crucial piece. If you don't wrap the entire subscription logic in a try catch
, you risk silently killing the subscription. Your application can become unresponsive to messages with no indication.
Hopefully, the main classes and abstract
providers are part of a class library or framework that can be generically consumed, so you wouldn't want to dictate how a module or application handled a messaging error. It could be something you shrug off, but you might need to bring down the application to avoid corrupting data.
Points of Interest
The ultimate goal here was just to lay the architecture and framework to implement your own MessageBus
. There's a lot that can change and be tweaked based on individual application and messaging requirements. If you risk processing duplicate message, you may need to add a MessageId
field and handle accordingly. A lot also depends on individual challenges of the providers that you pick.
History
- 2015-09-03: Preparing for insults