Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Hosted-services / Azure

CQRS with Decoupled Messaging - Part II

5.00/5 (5 votes)
4 Mar 2016CPOL7 min read 26.4K  
Second post in a series of articles that show practical application of CQRS architecture with emphasis on decoupling messaging as infrastructure component

Introduction

This article focuses on listing out few Transport mechanisms available for messaging, how frameworks like Masstransit and nServicebus help by abstracting out the lower level (transport level) details and showing how these frameworks can be used as infrastructure components to provide concrete implementation of IServicebus interface.

We will be working with the code samples available in the documentation of MassTransit and nServicebus and converting those samples to follow Dependency Inversion Principle.

Note that this post is part of series of articles, links for all the articles in the series are provided below.

  1. Introduction
  2. Need for Enterprise Servicebus Frameworks and Decoupled messaging with their samples(this article)
  3. Inventory Manager - CQRS application (with decoupled messaging) - Commands
  4. Inventory Manager - CQRS application (with decoupled messaging) - Aggregate and Event Sourcing
  5. Inventory Manager - CQRS application (with decoupled messaging) - ReadSide

Transport Mechanism

For transport of the messages, there are several components in the market.

  • Msmq
  • Rabbitmq
  • Azure service bus
  • And many others

There are libraries/classes in .NET that allow working with these transports directly. However, the kind of code that one would end up with when using these transports directly is as follows.

Consider this sample from Azure service bus.

Publishing/Sending the Message

C#
string connectionString =
    CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

QueueClient Client =
    QueueClient.CreateFromConnectionString(connectionString, "TestQueue");

Client.Send(new BrokeredMessage());

Receiving the Message

C#
// Callback to handle received messages.
Client.OnMessage((message) =>
{
    try
    {
        // Process message from queue.
        Console.WriteLine("Body: " + message.GetBody<string>());
        Console.WriteLine("MessageID: " + message.MessageId);
        Console.WriteLine("Test Property: " +
        message.Properties["TestProperty"]);

        // Remove message from queue.
        message.Complete();
    }
    catch (Exception)
    {
        // Indicates a problem, unlock message in queue.
        message.Abandon();
    }
}, options);

There are several downsides to using these queues directly in code. One can only send specific message Types (BrokeredMessage) as seen in the example above, which would couple us to the transport mechanism and make it difficult to switch to another one later if the need arises.

In a real world application, we will need to specify handlers for application specific messages namely, events and commands that get published from our domain and application layer correspondingly. Thus, there is a need to use application specific message types as shown below in the CQRS Journey code snippet.

C#
namespace Registration.Handlers
{
    public class SeatAssignmentsHandler :
        IEventHandler<OrderConfirmed>,
        IEventHandler<OrderPaymentConfirmed>,
        ICommandHandler<UnassignSeat>,
        ICommandHandler<AssignSeat>
    {
    
        // .. Code
     
        public void Handle(OrderPaymentConfirmed @event)
        {
            // .. Code
        }

        public void Handle(OrderConfirmed @event)
        {
            // .. Code
        }

        public void Handle(AssignSeat command)
        {
            // .. Code
        }

        public void Handle(UnassignSeat command)
        {
            // .. Code
        }
    }
}

The class specifies which messages it handles by implementing the interface IxxxHandler<MessageType>. It is the desired implementation following Single Responsibility Principle, where one is not bothered about where the messages are coming from and converting them from BrokeredMessage into application specific message types and such.

However, such an implementation is difficult to achieve when you are directly working with transport mechanism. The complexity increases as one has to handle sending and receiving of messages, registration of the handlers, converting them to application specific message types from BrokeredMessage among many other things. CQRS Journey project attempted to do this and thus it ended up with complex hierarchy of classes as shown below.

Image 1

CommandDispatcher populates a dictionary of commandType => Handlers, when Register method is called upon it. MessageProcessor.onMessageReceived is registered as a callback in IMessageProcessor.Start. For the sake of clarity, EventProcessor set of class hierarchy has been omitted from the above diagram.

The code becomes difficult to follow and maintain. Adding new handlers becomes very difficult since there are several components that get modified and registrations are required at many different places.

Please note that CQRS Journey is a great sample application for learning CQRS, process manager, event sourcing and other related concepts, I have found myself struggling to understand their infrastructural components in order to apply these concepts in a real world or even a sample application.

ESB Frameworks

ESB frameworks like MassTransit and nServicebus come in handy for solving the problem described above. These frameworks abstract out the transport mechanism and provide us with clean APIs where we can easily publish and subscribe to application specific messages.

There have already been several samples created for documenting their use.

  1. Masstransit Samples - Loosely coupled labs. Website hosts several step by step code walkthroughs on how to use Masstransit service bus framework for publish subscribe. Click here to view the sample of using Masstransit with Azure service bus, which we will be using to demonstrate decoupling of messaging infrastructure component. I suggest going through the code-walkthrough of the sample atleast till implementing the publish subscribe via console application. One may ignore the “cloudy way” portion of the sample for understanding this article. Also, here is the documentation website for Masstransit.
  2. nServicebus Samples - Documentation website for nServicebus. Click here to view the sample for using nServicebus with msmq. This article will be posting the code for converting the mentioned nServicebus sample (using version 5 of nServicebus) into decoupled messaging.

Masstransit with Azure Servicebus Sample Decoupled

Initial starting point of this sample was the sample from loosely coupled labs. Sample posted with this article tries to decouple messaging from the sample posted on loosely coupled labs. The code walkthrough will focus on this decoupling.

Source code for decoupled sample can be found on github Repository - Located here

Code Walkthrough

C#
public interface IServiceBus : IDisposable
{
    void Publish(IEvent eventMessage);
    void Send(ICommand commandMessage);
}
  • Publisher and subscriber EXEs do not depend on concrete implementation Masstransit framework. They depend on the IServiceBus interface.
  • Infrastructure.Common project contains only interfaces and no concrete implementations. Thus, it provides the abstraction that EXEs take dependency on.
    C#
    namespace Infrasctructure.MasstransitServiceBus
    {
        public class MassTransitServiceBus : IServiceBus
        {
            private readonly MassTransit.IServiceBus _massTransitBus;
    
            public MassTransitServiceBus(Action<ServiceBusConfigurator> configurator)
            {
                Log4NetLogger.Use();
                _massTransitBus = ServiceBusFactory.New(sbc => configurator(sbc));
            }
    
            public void Publish(IEvent eventMessage)
            {
                _massTransitBus.Publish(eventMessage, eventMessage.GetType(), 
                x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
            }
    
            public void Send(ICommand commandMessage)
            {
                _massTransitBus.Publish(commandMessage, commandMessage.GetType(), 
                x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
            }
    
            public void Dispose()
            {
                _massTransitBus.Dispose();
            }
        }
    }
  • The interface is implemented in the infrastructure project Infrastructure.MasstransitServiceBus
  • The class MassTransitServicebus provides a wrapper over the Masstransit library so that we can customize Masstransit library to suit our needs. These customizations will be thus made only in this infrastructure component, thus following SRP (Things that change together go together).
    C#
    namespace TestPublisher.Main
    {
        public class Bootstrapper
        {
            // .. Code
    
            private static void RegisterServiceBus()
            {
                var bus = new MassTransitServiceBus(x => new MassTransitWithAzureServiceBusConfigurator
    		(ConfigurationManager.AppSettings.Get("azure-namespace"), "TestPublisher", 
    		ConfigurationManager.AppSettings.Get("azure-key"), x));
                IoC.RegisterInstance<IServiceBus>(bus);
            }
    
            // .. Code
        }
    }
  • The IServiceBus interface is dependency injected in the Bootstrapper class in Main project. Both publisher and subscriber EXEs have their separate Main and Bootstrapper classes that they depend upon.
  • Thus following dependency injection principle, higher level policies Publisher and Subscribers no longer depend on lower level details Masstransit framework, both of them (higher level policy and lower level details) depend on abstraction IServiceBus.

Publisher

Image 2

  • TestPublisher EXE does not depend on the infrastructure components directly.
  • As we will see further in the next article, if Publisher had followed layered architecture, none of the layers would have taken dependency on MassTransitServiceBus, except Infrastructure layer
  • Publishing of the message happens by resolving interface IServiceBus via IoC
    C#
    namespace TestPublisher
    {
        internal class Program
        {
            private static void Main(string[] args)
            {   
                Bootstrapper.Init();
                var serviceBus = IoC.Resolve<IServiceBus>();
    
                // .. Code
    
                serviceBus.Publish(message);
    
                // .. Code        
                serviceBus.Dispose();
            }
        }
    }

Subscriber

Image 3

  • In the Bootstrapper class, TestSubscriber registers the handlers for the message when building the MassTransitServiceBusConfigurator and then registers for IServiceBus with IoC.
    C#
    namespace TestSubscriber.Main
    {
        public class Bootstrapper
        {
           // .. Code
    
            private static void RegisterServiceBus()
            {
                var bus = new MassTransitServiceBus(
                    x =>
                        new MassTransitWithAzureServiceBusConfigurator(
                            ConfigurationManager.AppSettings.Get("azure-namespace"), "TestSubscriber",
                            ConfigurationManager.AppSettings.Get("azure-key"), x)
                            .WithHandler<SomethingHappened, SomethingHappenedHandler>());
                                                              
                ;
                IoC.RegisterInstance<IServiceBus>(bus);
            }
    
            // .. Code
        }
    }
  • SomethingHappenedHandler class implements IHandle<SomethingHappened>
    C#
    namespace TestSubscriber.AppService
    {
      public class SomethingHappenedHandler : IHandle<SomethingHappened>
      {
          public void Handle(SomethingHappened message)
          {
              // .. Code
          }
      }
    }

Dependency on Masstransit library

Image 4

  • As seen above, only the following projects have dependency on Masstransit
    • Infrastructure.MassTransit => Since it implements IServiceBus
    • TestPublisher.Main, TestSubscriber.Main => Since it registers/bootstraps IServiceBus with concrete implementation

Running the Application

  • Please provide the values for Azure service bus namespace and key for the service bus in app.config files for both TestPublisher and TestSubscriber.
    XML
    <appSettings>
      <add key="azure-namespace" value="" />
      <add key="azure-key" value="" />
    </appSettings>
    

NserviceBus Decoupled Example

As an advantage of following dependency inversion principle, we get the ability to swap out one library in place of other. The next code sample demonstrates this, in which IServiceBus interface is getting implemented by a wrapper class that wraps nServiceBus library.

Initial starting point of this code sample was the sample (with version 5 of nServiceBus) from nServiceBus documentation. Decoupled code follows similar concepts as the example with MassTransitBus and thus no code walkthrough is required.

Source code for this sample can be found on github Repository - Located here

Note on Decoupling

The goal is to have the infrastructure decoupled from the core domain logic, so that it does not get in the way when we implement our business logic.

It also gives you the added advantage of being able to upgrade and swap third party library components, with areas of change contained within a single project, as shown above by changing from Masstransit to nServiceBus.

Next Article in the Series

The next article in the series will take the messaging infrastructure component (one that was developed for MassTransit and Azure Service Bus communication in this article) and build a simple CQRS application.

For a complete list of articles in this series, please go to the Introduction section of this article.
Thanks for reading the articles, hope they are proving insightful.

References

  1. Single Responsibility Principle by Uncle Bob
  2. Dependency Inversion Principle by Uncle Bob
  3. Azure Service bus sample
  4. CQRS Journey
  5. Masstransit
  6. Loosely coupled Labs - For samples on MassTransit
  7. nServicebus
  8. Samples for nServicebus

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)