Introduction
This article focuses on listing out few Transport mechanisms available for messaging, how frameworks like Masstransit
and nServicebus
help by abstracting out the lower level (transport level) details and showing how these frameworks can be used as infrastructure components to provide concrete implementation of IServicebus
interface.
We will be working with the code samples available in the documentation of MassTransit
and nServicebus
and converting those samples to follow Dependency Inversion Principle.
Note that this post is part of series of articles, links for all the articles in the series are provided below.
- Introduction
- Need for Enterprise Servicebus Frameworks and Decoupled messaging with their samples(this article)
- Inventory Manager - CQRS application (with decoupled messaging) - Commands
- Inventory Manager - CQRS application (with decoupled messaging) - Aggregate and Event Sourcing
- Inventory Manager - CQRS application (with decoupled messaging) - ReadSide
Transport Mechanism
For transport of the messages, there are several components in the market.
- Msmq
- Rabbitmq
- Azure service bus
- And many others
There are libraries/classes in .NET that allow working with these transports directly. However, the kind of code that one would end up with when using these transports directly is as follows.
Consider this sample from Azure service bus.
Publishing/Sending the Message
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
QueueClient Client =
QueueClient.CreateFromConnectionString(connectionString, "TestQueue");
Client.Send(new BrokeredMessage());
Receiving the Message
Client.OnMessage((message) =>
{
try
{
Console.WriteLine("Body: " + message.GetBody<string>());
Console.WriteLine("MessageID: " + message.MessageId);
Console.WriteLine("Test Property: " +
message.Properties["TestProperty"]);
message.Complete();
}
catch (Exception)
{
message.Abandon();
}
}, options);
There are several downsides to using these queues directly in code. One can only send specific message Types (BrokeredMessage
) as seen in the example above, which would couple us to the transport mechanism and make it difficult to switch to another one later if the need arises.
In a real world application, we will need to specify handlers for application specific messages namely, events and commands that get published from our domain and application layer correspondingly. Thus, there is a need to use application specific message types as shown below in the CQRS Journey code snippet.
namespace Registration.Handlers
{
public class SeatAssignmentsHandler :
IEventHandler<OrderConfirmed>,
IEventHandler<OrderPaymentConfirmed>,
ICommandHandler<UnassignSeat>,
ICommandHandler<AssignSeat>
{
public void Handle(OrderPaymentConfirmed @event)
{
}
public void Handle(OrderConfirmed @event)
{
}
public void Handle(AssignSeat command)
{
}
public void Handle(UnassignSeat command)
{
}
}
}
The class specifies which messages it handles by implementing the interface IxxxHandler<MessageType>
. It is the desired implementation following Single Responsibility Principle, where one is not bothered about where the messages are coming from and converting them from BrokeredMessage
into application specific message types and such.
However, such an implementation is difficult to achieve when you are directly working with transport mechanism. The complexity increases as one has to handle sending and receiving of messages, registration of the handlers, converting them to application specific message types from BrokeredMessage
among many other things. CQRS Journey project attempted to do this and thus it ended up with complex hierarchy of classes as shown below.
CommandDispatcher
populates a dictionary of commandType => Handlers
, when Register
method is called upon it. MessageProcessor.onMessageReceived
is registered as a callback in IMessageProcessor.Start
. For the sake of clarity, EventProcessor
set of class hierarchy has been omitted from the above diagram.
The code becomes difficult to follow and maintain. Adding new handlers becomes very difficult since there are several components that get modified and registrations are required at many different places.
Please note that CQRS Journey is a great sample application for learning CQRS, process manager, event sourcing and other related concepts, I have found myself struggling to understand their infrastructural components in order to apply these concepts in a real world or even a sample application.
ESB Frameworks
ESB frameworks like MassTransit
and nServicebus
come in handy for solving the problem described above. These frameworks abstract out the transport mechanism and provide us with clean APIs where we can easily publish and subscribe to application specific messages.
There have already been several samples created for documenting their use.
Masstransit
Samples - Loosely coupled labs. Website hosts several step by step code walkthroughs on how to use Masstransit
service bus framework for publish subscribe. Click here to view the sample of using Masstransit
with Azure service bus, which we will be using to demonstrate decoupling of messaging infrastructure component. I suggest going through the code-walkthrough of the sample atleast till implementing the publish subscribe via console application. One may ignore the “cloudy way” portion of the sample for understanding this article. Also, here is the documentation website for Masstransit. nServicebus
Samples - Documentation website for nServicebus. Click here to view the sample for using nServicebus
with msmq. This article will be posting the code for converting the mentioned nServicebus
sample (using version 5 of nServicebus) into decoupled messaging.
Masstransit with Azure Servicebus Sample Decoupled
Initial starting point of this sample was the sample from loosely coupled labs. Sample posted with this article tries to decouple messaging from the sample posted on loosely coupled labs. The code walkthrough will focus on this decoupling.
Source code for decoupled sample can be found on github Repository - Located here
Code Walkthrough
public interface IServiceBus : IDisposable
{
void Publish(IEvent eventMessage);
void Send(ICommand commandMessage);
}
- Publisher and subscriber EXEs do not depend on concrete implementation
Masstransit
framework. They depend on the IServiceBus
interface. Infrastructure.Common
project contains only interfaces and no concrete implementations. Thus, it provides the abstraction that EXEs take dependency on.
namespace Infrasctructure.MasstransitServiceBus
{
public class MassTransitServiceBus : IServiceBus
{
private readonly MassTransit.IServiceBus _massTransitBus;
public MassTransitServiceBus(Action<ServiceBusConfigurator> configurator)
{
Log4NetLogger.Use();
_massTransitBus = ServiceBusFactory.New(sbc => configurator(sbc));
}
public void Publish(IEvent eventMessage)
{
_massTransitBus.Publish(eventMessage, eventMessage.GetType(),
x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
}
public void Send(ICommand commandMessage)
{
_massTransitBus.Publish(commandMessage, commandMessage.GetType(),
x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
}
public void Dispose()
{
_massTransitBus.Dispose();
}
}
}
- The interface is implemented in the infrastructure project
Infrastructure.MasstransitServiceBus
- The class
MassTransitServicebus
provides a wrapper over the Masstransit
library so that we can customize Masstransit
library to suit our needs. These customizations will be thus made only in this infrastructure component, thus following SRP (Things that change together go together).
namespace TestPublisher.Main
{
public class Bootstrapper
{
private static void RegisterServiceBus()
{
var bus = new MassTransitServiceBus(x => new MassTransitWithAzureServiceBusConfigurator
(ConfigurationManager.AppSettings.Get("azure-namespace"), "TestPublisher",
ConfigurationManager.AppSettings.Get("azure-key"), x));
IoC.RegisterInstance<IServiceBus>(bus);
}
}
}
- The
IServiceBus
interface is dependency injected in the Bootstrapper
class in Main
project. Both publisher and subscriber EXEs have their separate Main
and Bootstrapper
classes that they depend upon. - Thus following dependency injection principle, higher level policies
Publisher
and Subscribers
no longer depend on lower level details Masstransit
framework, both of them (higher level policy and lower level details) depend on abstraction IServiceBus
.
Publisher
Subscriber
- In the
Bootstrapper
class, TestSubscriber
registers the handlers for the message when building the MassTransitServiceBusConfigurator
and then registers for IServiceBus
with IoC.
namespace TestSubscriber.Main
{
public class Bootstrapper
{
private static void RegisterServiceBus()
{
var bus = new MassTransitServiceBus(
x =>
new MassTransitWithAzureServiceBusConfigurator(
ConfigurationManager.AppSettings.Get("azure-namespace"), "TestSubscriber",
ConfigurationManager.AppSettings.Get("azure-key"), x)
.WithHandler<SomethingHappened, SomethingHappenedHandler>());
;
IoC.RegisterInstance<IServiceBus>(bus);
}
}
}
SomethingHappenedHandler
class implements IHandle<SomethingHappened>
namespace TestSubscriber.AppService
{
public class SomethingHappenedHandler : IHandle<SomethingHappened>
{
public void Handle(SomethingHappened message)
{
}
}
}
Dependency on Masstransit library
- As seen above, only the following projects have dependency on Masstransit
Infrastructure.MassTransit
=> Since it implements IServiceBus
TestPublisher.Main
, TestSubscriber.Main
=> Since it registers/bootstraps IServiceBus
with concrete implementation
Running the Application
NserviceBus Decoupled Example
As an advantage of following dependency inversion principle, we get the ability to swap out one library in place of other. The next code sample demonstrates this, in which IServiceBus
interface is getting implemented by a wrapper class that wraps nServiceBus
library.
Initial starting point of this code sample was the sample (with version 5 of nServiceBus
) from nServiceBus
documentation. Decoupled code follows similar concepts as the example with MassTransitBus
and thus no code walkthrough is required.
Source code for this sample can be found on github Repository - Located here
Note on Decoupling
The goal is to have the infrastructure decoupled from the core domain logic, so that it does not get in the way when we implement our business logic.
It also gives you the added advantage of being able to upgrade and swap third party library components, with areas of change contained within a single project, as shown above by changing from Masstransit
to nServiceBus
.
Next Article in the Series
The next article in the series will take the messaging infrastructure component (one that was developed for MassTransit
and Azure Service Bus communication in this article) and build a simple CQRS application.
For a complete list of articles in this series, please go to the Introduction section of this article.
Thanks for reading the articles, hope they are proving insightful.
References
- Single Responsibility Principle by Uncle Bob
- Dependency Inversion Principle by Uncle Bob
- Azure Service bus sample
- CQRS Journey
- Masstransit
- Loosely coupled Labs - For samples on MassTransit
- nServicebus
- Samples for nServicebus