fHo
There is a small demo app
which is available right here
This article is all about using a inter process service bus to perform
potentially long runnng workflows. I have chosen to use
NServiceBus (NSB here after) to
demonstrate this, as it offers the following attributes which I personally
consider to be very important for any messaging layer:
- Transactional
- Durable
- Reliable
- Distributed
- Scalable
In this article we will not be touching on all of those points, but we will
cover a few of them. I will also do a walk through of creating a potentially
long running workflow (saga in NSB speak), and talk about how the workflow state
is persisted and managed between states.
I should also mention that NSB is not the only inter process bus that could
do this, you could probably use others such as
- Mass Transit
- Rabbit MQ
- Azure service bus (though I have no experience of that so may be
speaking out of turn)
NServiceBus supports various
mechanisms for its persistence of data, such as
- MSMQ
- Raven DB
- SQL Server, via the use of NHibernate
I have chosen t use SQL Server for this article as it something that I wanted
to try. I have however also included a RavenDB version of the process that
requires storage should you wish to download the RavenDB server, and want to try
it out.
Since I am using SQL Server you will need to have access to that (which I
guess if you are reading this article you might just have). You will then need
to do the following 2 things
- Create a new database called "NServiceBusPersistence"
- Change the connection string in the App.config of the "SQLSubscriber"
project to point to your own database
NServiceBus is really a
distributed messaging layer. It allows for all sorts of communications between
processes, you can choose from
- Peer to peer
- Pub/sub
- Send to self
- Broadcast to all
However at the end of the day what makes this all possible is the central
backbone of NServiceBus. These
days NServiceBus also offers a
cloud based offering. In this article I will however be focusing more on the
regular NServiceBus installation
(well I think it is more common at any rate), which is by using NServiceBus
within an organization using MSMQ.
As previously stated NServiceBus
provides the following attributes
Transactional
This uses the standard Distributed Transaction Coordinator, to allow inter
process transactions to occur. What the NServiceBus
framework does is that it will only commit a Transaction when a message (or
complete Saga) is completed. If an Exception is seen witin a message hanlder, NServiceBus
will possibly carry out n-many retries (you must configure this), after which it
will roll back the transaction.
Durable
As NServiceBus uses MSMQ as a
transport (or Azure, but as I stated we are not discussing that in this
article). All messages are safe, and will be maintained even after the loss of
power. The usual MSMQ benefits are available.
Reliable
This is partly achieved thanks to the use of MSMQ and also thanks to the fact
that NServiceBus persists things
at various stages in it's messaging pipeline. As previously stated this could
use MSMQ/Raven or SQL storage. This article will talk about the use of SQL
storage for persistence.
Distributed
The fact that we could distribute handlers/publishers across an entire
netowork, allows us to completely distribute our messaging layer.
Scalable
NServiceBus provides a load
balancer called "The Distributor". I will not be covering that in this article,
but if you think you would like to load balance your messaging components, "The
Distributor" may be for you.
As previously stated NServiceBus
provides a common messaging layer interface For the demo app we will be looking
at using NServiceBus to accept a
Command and then use a Message. This diagram may help to illustrate this
further.
It can be seen that NServiceBus
provides the common messaging layer backbone (for want of a better word) and
that each of the processes that deals with the messages (known as an endpoint in
NSB speak) has a MSMQ.
It is important to note that this is only one possible configuration of how
to use NServiceBus. You really
can configure it to suit your needs. The above diagram is just how I chose to do
it for the demo app.
It is fair to say that NServiceBus
is all about the messages. If you want to send a command or an event to a
NServiceBus endpoint, it will
take the form of a message.
So what exactly is a message. Put simply it is a shared contract that all
parties know how to deal with. The messages are serialized by
NServiceBus into the
serialization format you choose (there are many choices here Xml, Json, Bson,
Binary etc etc), and deserialized by the endpoint(s) that deal with the messages
(via the use of confguration and the expected handlers).
If you come from a WCF world, you can think of messages as the
[DataContract]
objects that are shared between client and server.
There is a distinction between "Command" messages and "Message" messages, but
we willl discuss this later. For now lets just see an example of the code for
both
Command
namespace Messages.Commands
{
public class CreatePurchaseOrderCommand
{
public int PurchaseOrderId { get; set; }
public string Description { get; set; }
}
}
Namespace Commands
Public Class CreatePurchaseOrderCommand
Public Property PurchaseOrderId() As Integer
Public Property Description() As String
End Class
End Namespace
Message
namespace Messages.Mess
{
public class CheckPurchaseOrderStatusMessage
{
public int PurchaseOrderId { get; set; }
public string ConfirmationDescription { get; set; }
}
}
Namespace Mess
Public Class CheckPurchaseOrderStatusMessage
Public Property PurchaseOrderId() As Integer
Public Property ConfirmationDescription() As String
End Class
End Namespace
As you can see the Command and Message classes are just standard .NET classes,
nothing special about them at all. We will et to the bit where we discuss he
importance between Commands and Message, but for now just note that these
messages are just regular .NET classes that allow you to add whatever data you
want to them. You can be reassured that NServiceBus
will make sure that the values for these messages will be stored when needed.
Ok so what exactly is the difference between a command. Well to understand
that a bit further, lets consider the following scenario.
We wish to design an ordering system something like Amazon, where we
allow the user to place an order. When an order is placed by the customer it
should be dispatched and email is sent to the client notifying them of the
purchase.
Now that is a pretty simple workflow, but how do "Commands" and "Messages" fit
into that?
Well if we break it down we can imagine something like the following:
Action | Command Or Message? |
Place order | Command to start the work flow |
Dispatch order | Message that could advance the workflow |
Email client order confirmation | Message that could advance the workflow |
Still not clear?
Ok so lets try some words. I find it helps to think of things as follows:
Command : Is something that is about to happen. Typically
this would be a 1..1 type of message.
Message : Something that may continue the current long
running process. This would typically be sent the current endpoint, and would
ore than likely be a 1..1 type of message.
NOTE : The more eagle eyed amongst you may think that rather than having one
big workflow we could possible have multiple endpoints, one for DispatchOrder,
and another for EmailConfirmationToClient. The difference being is that if you
chose to go down this route, you would need to send a message to a new endpoint
rather than locally
NServiceBus
also supports the idea of publishing events, this would typically be used when
you want more than one endpoint to act on some event. In fact it should come as
no suprise that NServiceBus
distinguishes this type of message as an "Event" that is broadcast (via
Bus.Publish(...)) to all interested endpoints. This is not done in the demo app,
but rest assured it is completely possible, and is easily acheived by using NServiceBus.
Thing is at the end of the day this is an architectual decision that only you
can make.
More information of general messaging type distinctions can be found here and
here. Although it does not cover the NServiceBus message concept exactly, but these links are still a worthy read
NServiceBus offers a few
hosting solutions, you may choose from:
- Running the standalone NServiceBus.Host.exe executable
- Hosting NServiceBus
inside a windows service, which just runs the standalone
NServiceBus.Host.exe executable
- Or go completely self hosted.
The code in this article uses the last option where we self host the bus.
This is done as follows:
The hosting code for the demo article is slightly different depending on
which endpoint you are taking about.
private static IBus CreateBusFactory()
{
Configure.Transactions.Enable();
Configure.Serialization.Xml();
Configure.Features.Disable<Sagas>();
var bus = Configure.With()
.DefaultBuilder()
.Log4Net()
.MsmqSubscriptionStorage()
.DefiningCommandsAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Commands"))
.UseTransport<Msmq>()
.UnicastBus()
.DisableTimeoutManager()
.SendOnly();
return bus;
}
Private Shared Function CreateBusFactory() As IBus
Configure.Transactions.Enable()
Configure.Serialization.Xml()
Configure.Features.Disable(Of Sagas)()
Dim bus = Configure.[With]() _
.DefaultBuilder() _
.Log4Net() _
.MsmqSubscriptionStorage() _
.DefiningCommandsAs(Function(t) t.[Namespace] IsNot Nothing _
AndAlso t.[Namespace].StartsWith("Messages.Commands")) _
.UseTransport(Of NServiceBus.Msmq)() _
.UnicastBus() _
.DisableTimeoutManager() _
.SendOnly()
Return bus
End Function
Lets break that down a bit shall we (Note this section will only show code in
C#, sorry)
Fluent API Part | Description |
Configure.Transactions.Enable(); | Enable transactions |
Configure.Serialization.Xml(); | Use Xml serialization |
Configure.Features.Disable<Sagas>(); | Disable the Saga feature (the sender doesn't
need to run long running workflows) |
Configure.With() | Start the standard configurator fluent API |
.DefaultBuilder() | Use the defalt IOC container (AutoFac at time
of writing this article) |
.Log4Net() | Use Log4Net logging |
.MsmqSubscriptionStorage() | Use MSMQ subscription storage |
.DefiningCommandsAs(t => t.Namespace != null &&
& t.Namespace.StartsWith("Messages.Commands")) | Define commands as those in the namespace shown
(You could also use special NSB attributes, but I feel this fluent API
is nicer) |
.UseTransport<Msmq>() | Use MSMQ transport |
.UnicastBus() | Create the unicast bus |
.DisableTimeoutManager() | Disable the timeout manager. NSB uses RavenDB
as the default timeout manager, which we do not want to setup for th
command sender. So by disabling the timeout manager, we are effectively
also stopping Raven DB too |
.SendOnly(); | Set this endpoint up as a "Send Only" endpoint.
This is find as this endpoint is only "Sending" a command |
namespace SQLSubscriber.IOC
{
public class NServiceBusInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
Configure.Transactions.Enable();
Configure.Serialization.Xml();
Configure.Features.Enable<Sagas>();
Configure.With()
.Log4Net()
.CastleWindsorBuilder(container)
.DefiningCommandsAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Commands"))
.DefiningMessagesAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Mess"))
.UseTransport<Msmq>()
.PurgeOnStartup(false)
.UnicastBus()
.LoadMessageHandlers()
.UseNHibernateSubscriptionPersister()
.UseNHibernateTimeoutPersister()
.UseNHibernateSagaPersister()
.UseNHibernateGatewayPersister()
.CreateBus()
.Start(() => Configure.Instance.ForInstallationOn<Windows>().Install());
}
}
}
Namespace IOC
Public Class NServiceBusInstaller
Implements IWindsorInstaller
Public Sub IWindsorInstaller_Install(ByVal container As IWindsorContainer, ByVal store As IConfigurationStore) Implements IWindsorInstaller.Install
Configure.Transactions.Enable()
Configure.Serialization.Xml()
Configure.Features.Enable(Of Sagas)()
Configure.[With]() _
.Log4Net() _
.CastleWindsorBuilder(container) _
.DefiningCommandsAs(Function(t) t.[Namespace] _
IsNot Nothing AndAlso t.[Namespace].StartsWith("Messages.Commands")) _
.DefiningMessagesAs(Function(t) t.[Namespace] _
IsNot Nothing AndAlso t.[Namespace].StartsWith("Messages.Mess")) _
.UseTransport(Of NServiceBus.Msmq)() _
.PurgeOnStartup(False) _
.UnicastBus() _
.LoadMessageHandlers() _
.UseNHibernateSubscriptionPersister() _
.UseNHibernateTimeoutPersister() _
.UseNHibernateSagaPersister() _
.UseNHibernateGatewayPersister() _
.CreateBus() _
.Start(Function() InstallForWindows())
End Sub
Private Function InstallForWindows()
Configure.Instance.ForInstallationOn(Of Windows)().Install()
Return True
End Function
End Class
End Namespace
Lets break that down a bit shall we (Note this section will only show code in
C#, sorry)
Fluent API Part | Description |
CConfigure.Transactions.Enable(); | Enable transactions |
Configure.Serialization.Xml(); | Use Xml serialization |
Configure.Features.Enable<Sagas>();/td>
| Enable the saga feature, as this endpoint wants
to be able to run long running workflows |
Configure.With() | Start the standard configurator fluent API |
.Log4Net() | Use Log4Net logging |
.CastleWindsorBuilder(container) | Use a specific castle container |
.DefiningCommandsAs(t => t.Namespace != null &&
t.Namespace.StartsWith("Messages.Commands")) | Define commands as those in the namespace shown
(You could also use special NSB attributes, but I feel this fluent API
is nicer) |
.DefiningMessagesAs(t => t.Namespace != null &&
t.Namespace.StartsWith("Messages.Mess")) | Define commands as those in the namespace shown
(You could also use special NSB attributes, but I feel this fluent API
is nicer) |
.UseTransport<Msmq>()
.PurgeOnStartup(false) | Use MSMQ transport |
.UnicastBus() | Create the unicast bus |
.LoadMessageHandlers() | Load ALL the message handlers |
.UseNHibernateSubscriptionPersister() | Use SQL subscrtiption storage |
.UseNHibernateTimeoutPersister() | Use SQL time out storage |
.UseNHibernateSagaPersister() | Use SQL saga storage |
.UseNHibernateGatewayPersister() | Use SQL gateway storage |
.CreateBus() | Create the bus |
.Start(() =>
Configure.Instance.ForInstallationOn<Windows>().Install()); | Install it for Windows |
As you can see the setup of NServiceBus
is done using a fluent API, which is quite similar in both these cases.
This section will discuss how to configure NServiceBus
for the demo app requirements outlined above. You will most definately need to
put in some work of your own if you requirements change from that of the demo
app (which they 100% will, so get ready for some self learning)
Here is the config (App.Config) for the command sender
="1.0"="utf-8"
<configuration>
<configSections>
<section name="MessageForwardingInCaseOfFaultConfig"
type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
<section name="UnicastBusConfig"
type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
<section name="Logging"
type="NServiceBus.Config.Logging, NServiceBus.Core" />
<section name="log4net"
type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
</configSections>
<MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
<UnicastBusConfig ForwardReceivedMessagesTo="audit">
<MessageEndpointMappings>
<add Assembly="Messages"
Type="Messages.Commands.CreatePurchaseOrderCommand"
Endpoint="SQLSubscriber" />
</MessageEndpointMappings>
</UnicastBusConfig>
<Logging Threshold="ERROR" />
</configuration>
One thing that should be quite obvious based on everything we have gone
through thus far, is that the command sender, is a 1..1 type of message. As
such, when we configure this NSB endpoint, we need to tell it where the messages
will end up. This is done using the <MessageEndpointMappings>
element that you can see above, where we specify the Endpoint of "SQLSubscriber
".
This tells NSB that the current Endpoint (Commander
) will be
able to send commands to the configured endpoints, which in this case is only
SQLSubscriber
.
There is some other configuration sections at play here which aer described
below
MessageForwardingInCaseOfFaultConfig
This section has one value, which is what the error queue should be called.
In my case this is a very generic "error
" (you like that, so do I
cool isnt it) value. You can call this what you like
Logging
This section sets the logging threshold for NSB. Threshold
is
the only value (AFAIK)
Here is the config (App.Config) for the command sender
="1.0"
<configuration>
<configSections>
<section name="MessageForwardingInCaseOfFaultConfig"
type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core"/>
<section name="UnicastBusConfig"
type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
<section name="Logging"
type="NServiceBus.Config.Logging, NServiceBus.Core"/>
<section name="log4net"
type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
</configSections>
<connectionStrings>
<add name="NServiceBus/Persistence" connectionString="......." providerName="System.Data.SqlClient"/>
</connectionStrings>
<appSettings>
<add key="NServiceBus/Persistence/NHibernate/dialect"
value="NHibernate.Dialect.MsSql2008Dialect" />
<add key="NServiceBus/Persistence/NHibernate/connection.provider"
value="NHibernate.Connection.DriverConnectionProvider" />
<add key="NServiceBus/Persistence/NHibernate/connection.driver_class"
value="NHibernate.Driver.Sql2008ClientDriver" />
</appSettings>
<MessageForwardingInCaseOfFaultConfig ErrorQueue="error"/>
<UnicastBusConfig ForwardReceivedMessagesTo="audit">
<MessageEndpointMappings>
</MessageEndpointMappings>
</UnicastBusConfig>
<Logging Threshold="ERROR"/>
</configuration>
We can see that the SQLSubscriber's
App.Config is quite a bit
different from the Commander
. Why is this?
Well for a start this aticle is mainly geared around how to create long
running workflows (Sagas) in NSB, and I have chosen to have the workflow execute
(as previously stated this is a design decision) in the same Endpoint. As such I
do not need to send an NSB message(s)/Command(s)/Event(s) outside of the current
Endpoint, which explains why you do not see any values for the <MessageEndpointMappings>
The other interesting thing to note here is, that since this endpoint is the
one that will deal with the long running workflow (NSB saga), we need to
configure a couple of MANDATORY (when using NHibernate SQL
storage at any rate) app settings, to tell NSB about the SQL server instance it
can use for storing ong running workflow (NSB saga) in.
However once you get past these 2 differences, there is no other differences
between the Command
and SQLSubscriber
projects.
In order to run the demo app attached you will need to do the following:
- Make sure you have done the prerequisites stuff
- Launch the SQLSubscriber project in debug mode (or the VB.SQLSubscriber)
- Launch the Commander project in debug mode (or the VB.Commander)
- Enter some text on the Commader TextBox
- Click the button, which should start the Saga. You cn confirm this by
putting a break point in the Saga (SQLSubscriber) Handle(..) methods.
If you have gotten this far, and have started to think about everything we
have talked about so far, you may be one of those people that goes:
"Mmmm, great but what about all the queues that need to be setup per
environment (where I work we have 6 environments). You know someone has to
create all those queues don't they. I don't want to do it!"
Well yes you are spot on, someone needs to do it for sure. Thankfully NSB
does all this on your behalkf, and when you run the demo app you should see
something like this:
I certainly did not set this up. What actually happens is that NSB reads you
configuration and does the heavy lifting for you, and is nice enough to create
your queues for you. Neato. Thanks NSB
Ok, so now we get to the point (finally) of this article. I wanted to talk
you through creating potentially long running workflows using NServiceBus.
So what is a "Saga"? Well in NServiceBus,
a "Saga" is a long running workflow that has these basic characteristics:
- It is potentially (though not always) long running
- Is made up of a number of different steps (these are encoded as message
handlers in NServiceBus)
- May be persisted to some backing store (RavenDB / MSMQ / SQL Server).
This ensures that Sagas are durable
- Sagas are reliable thanks to the MSMQ underlying transport usage
- May be restarted after being stopped
- May be completed
So that is essentially that is what a saga is
For the demo app I have fabricated a simple (potetially) long running workflow.
I say potentialy, as the demo app obviously doesn't to much as I wanted to keep
it simple, but it does show you the ideas/concepts, which are always the most
important things to learn anyway.
Here is a diagram of the Saga that the demo app creates:
And here is the complete code for the Saga:
using System.Threading;
using Messages.Commands;
using Messages.Mess;
using NServiceBus;
using NServiceBus.Saga;
using Services;
namespace SQLSubscriber.Handlers
{
public class MySaga : Saga<CreatePurchaseOrderSagaData>,
IAmStartedByMessages<CreatePurchaseOrderCommand>,
IHandleMessages<CheckPurchaseOrderStatusMessage>
{
public IPurchaseService PurchaseService { get; set; }
public override void ConfigureHowToFindSaga()
{
ConfigureMapping<CreatePurchaseOrderCommand>
(message => message.PurchaseOrderId)
.ToSaga(saga => saga.PurchaseOrderId);
ConfigureMapping<CheckPurchaseOrderStatusMessage>
(message => message.PurchaseOrderId)
.ToSaga(saga => saga.PurchaseOrderId);
}
public void Handle(CreatePurchaseOrderCommand message)
{
this.Data.TransactionId = PurchaseService.Initialise();
this.Data.PurchaseOrderId = message.PurchaseOrderId;
PurchaseService.Purchase(this.Data.TransactionId,
string.Format("1 * {0}", message.PurchaseOrderId));
Bus.SendLocal(new CheckPurchaseOrderStatusMessage()
{
PurchaseOrderId = this.Data.PurchaseOrderId
});
}
public void Handle(CheckPurchaseOrderStatusMessage message)
{
if (this.Data.Retries < 3)
{
bool isCompleted = PurchaseService.IsCompleted(this.Data.TransactionId);
if (isCompleted)
{
base.MarkAsComplete();
}
else
{
Thread.Sleep(2000);
this.Data.Retries++;
Bus.SendLocal(new CheckPurchaseOrderStatusMessage()
{
PurchaseOrderId = this.Data.PurchaseOrderId
});
}
}
else
{
base.MarkAsComplete();
}
}
}
}
Imports System.Threading
Imports VB.Services.VB.Services
Imports VB.Messages.Mess
Imports VB.Messages.Commands
Imports NServiceBus
Imports NServiceBus.Saga
Namespace Handlers
Public Class MySaga
Inherits Saga(Of CreatePurchaseOrderSagaData)
Implements IAmStartedByMessages(Of CreatePurchaseOrderCommand)
Implements IHandleMessages(Of CheckPurchaseOrderStatusMessage)
Public Overrides Sub ConfigureHowToFindSaga()
ConfigureMapping(Of CreatePurchaseOrderCommand) _
(Function(message) message.PurchaseOrderId) _
.ToSaga(Function(saga) saga.PurchaseOrderId)
ConfigureMapping(Of CheckPurchaseOrderStatusMessage) _
(Function(message) message.PurchaseOrderId) _
.ToSaga(Function(saga) saga.PurchaseOrderId)
End Sub
Public Property PurchaseService() As IPurchaseService
Public Sub Handle(ByVal message As CreatePurchaseOrderCommand) _
Implements IHandleMessages(Of CreatePurchaseOrderCommand).Handle
Me.Data.TransactionId = PurchaseService.Initialise()
Me.Data.PurchaseOrderId = message.PurchaseOrderId
PurchaseService.Purchase(Me.Data.TransactionId, String.Format("1 * {0}", message.PurchaseOrderId))
Dim checkPurchaseOrderStatusMessage = New CheckPurchaseOrderStatusMessage()
checkPurchaseOrderStatusMessage.PurchaseOrderId = Me.Data.PurchaseOrderId
Bus.SendLocal(checkPurchaseOrderStatusMessage)
End Sub
Public Sub Handle(ByVal message As CheckPurchaseOrderStatusMessage) _
Implements IHandleMessages(Of CheckPurchaseOrderStatusMessage).Handle
If Me.Data.Retries < 3 Then
Dim isCompleted As Boolean = PurchaseService.IsCompleted(Me.Data.TransactionId)
If isCompleted Then
MyBase.MarkAsComplete()
Else
Thread.Sleep(2000)
Me.Data.Retries += 1
Dim checkPurchaseOrderStatusMessage = New CheckPurchaseOrderStatusMessage()
checkPurchaseOrderStatusMessage.PurchaseOrderId = Me.Data.PurchaseOrderId
Bus.SendLocal(checkPurchaseOrderStatusMessage)
End If
Else
MyBase.MarkAsComplete()
End If
End Sub
End Class
End Namespace
Suprisingly there are only a few concepts to learn here.
Concept 1 : How Do We Create A Saga
That is dead easy we just need to inherit from the Saga<T>
class
(or Saga(Of CreatePurchaseOrderSagaData)
for VB.NET users) within NServiceBus.
Where the generic is the state data type associated with the Saga (more on this
later).
Concept 2 : How Are Sagas Started
For a Saga to be started we simply need to implement the NServiceBus
interface IAmStartedByMessages<T>
interface (or
IAmStartedByMessages(Of CreatePurchaseOrderCommand)
for VB.NET users).
This means whenever
NServiceBus
see a message of type
T
on the bus, it will start the Saga that the
type of
T
is associated with.
When you implement the IAmStartedByMessages<T>
interface (or
IAmStartedByMessages(Of CreatePurchaseOrderCommand)
you will also
get a Handle(..
) method that you must implement. This is the
message handler code for the message type that started the Saga.
Concept 3 : How Are Sagas Continued
For a Saga to be started we simply need to implement the NServiceBus
interface IHandleMessages<T>
interface (or
IHandleMessages(Of CreatePurchaseOrderCommand)
for VB.NET users). This
means whenever
NServiceBus
see a message of type
T
on the bus, it will start the Saga that the
type of
T
is associated with.
When you implement the IHandleMessages<T>
interface (or
IAmStarteIHandleMessagesdByMessages(Of CreatePurchaseOrderCommand)
you will also get a Handle(..
) method that you must implement. This
is the message handler code for the message type that continued the Saga.
Concept 4 : How Do We Store State For A Saga
We will see much more on this later. But as I have already stated, Sagas and NServiceBusoffer
durable messaging. So it is not unreasonable that we would want our Saga state
stored too. This has been thought of, and is as simple as writing to the state
object associated with the current Saga
this.Data.PurchaseOrderId = message.PurchaseOrderId;
Me.Data.PurchaseOrderId = message.PurchaseOrderId
Concept 5 : How Are Sagas Completed
Eventually you will want to complete the Saga, so how do we do that?
Luckily this too is an easy operation, we just do this:
base.MarkAsComplete();
MyBase.MarkAsComplete()
One of the things you may be asking yourself is how does NServiceBus
store state associated with a Saga. Well the secret to that lies in the use of a
simple property bag type class. For the demo app this is something like this:
using System;
using NServiceBus.Saga;
namespace SQLSubscriber.Handlers
{
public class CreatePurchaseOrderSagaData : IContainSagaData
{
public virtual Guid Id { get; set; }
public virtual string Originator { get; set; }
public virtual string OriginalMessageId { get; set; }
public virtual int PurchaseOrderId { get; set; }
public virtual string Description { get; set; }
public virtual string ConfirmationDescription { get; set; }
public virtual int Retries { get; set; }
public virtual Guid TransactionId { get; set; }
}
}
Imports NServiceBus.Saga
Namespace Handlers
Public Class CreatePurchaseOrderSagaData
Implements IContainSagaData
Public Overridable Property Id() As Guid Implements IContainSagaData.Id
Public Overridable Property Originator() As String Implements IContainSagaData.Originator
Public Overridable Property OriginalMessageId() As String Implements IContainSagaData.OriginalMessageId
Public Overridable Property PurchaseOrderId() As Int32
Public Overridable Property Description() As String
Public Overridable Property ConfirmationDescription() As String
Public Overridable Property Retries() As Int32
Public Overridable Property TransactionId() As Guid
End Class
End Namespace
It can be seen that there are a number of mandatory properties that you must
supply.
- public virtual Guid Id { get; set; }
- public virtual string Originator { get; set; }
- public virtual string OriginalMessageId { get; set; }
These are integral to the way NServiceBus
works with Saga data. The other properties are whatever you need to satisfy your
Saga requirements. For me those were these properties:
- public virtual int PurchaseOrderId { get; set; }
- public virtual string Description { get; set; }
- public virtual string ConfirmationDescription { get; set; }
- public virtual int Retries { get; set; }
- public virtual Guid TransactionId { get; set; }
So once you have this state object you may write to it within your saga code
something like this:
this.Data.TransactionId = PurchaseService.Initialise();
Me.Data.TransactionId = PurchaseService.Initialise()
So that is cool, nice and easy. But what exactly is going on here behind the
scenes? How does this state get used by NServiceBus?
Quite simply NServiceBus,
will use this state infomation and will automatically persist it, to your
backing persistence store of choice (be that RavenDB, SQL Server etc etc) when
the saga needs to persist its data. I repeat this is done automatically for you
by NServiceBus, you do not have
to manage this yourself.
That said if you choose to go down the SQL Server route you do need to have
done 2 things before you get into the nitty gritty of your Saga code.
Step 1 : Create A Database For NServiceBus Persistence
Create a new SQL Server database. For the demo app this is called
"NServiceBusPersistence". You will need to create your own one, but if you
decide to call it something different rememeber to change the App.Config stuff
in step 2, shown below
Step 2 : App.Config
Make sure you have specified the SQL Server stuff in the App.Config. This is
done as follows:
="1.0"
<configuration>
<connectionStrings>
<add name="NServiceBus/Persistence"
connectionString="Data Source=YOUR_DATABASE_NAME;
Initial Catalog=NServiceBusPersistence;Integrated Security=True;
Timeout=180;MultipleActiveResultSets=true;"
providerName="System.Data.SqlClient"/>
</connectionStrings>
<appSettings>
<add key="NServiceBus/Persistence/NHibernate/dialect"
value="NHibernate.Dialect.MsSql2008Dialect" />
<add key="NServiceBus/Persistence/NHibernate/connection.provider"
value="NHibernate.Connection.DriverConnectionProvider" />
<add key="NServiceBus/Persistence/NHibernate/connection.driver_class"
value="NHibernate.Driver.Sql2008ClientDriver" />
</appSettings>
<span lang="en-gb">......</span>
<span lang="en-gb">......</span>
<span lang="en-gb">......</span>
<span lang="en-gb">......</span>
<span lang="en-gb">......</span>
</configuration>
Once you have those 2 things in place, NServiceBus
will automatically create the database tables it needs to manage the Saga state
storage, and it will also populate the table as it sees fit with the relevant
data.
For eample this is what the demo app looks like when it is running. See how
there is a "CreatePurchaseOrderSagaData" table.NServiceBus
created that for us.
We can also see that this table is filled with data while the Saga is
running. Again this is all handled by NServiceBus
Ok great we have managed to now save some saga data. But just how does NServiceBus
know wihch of all the running Sagas this data belongs to. Well the table that
automatically created by NServiceBus
lets it know the type of the Saga, so that is cool. But we could have many many
instances of the same Saga all running at the same time, so how do we known
which one the data stored is for. Mmmm, sounds like a problem, but if you recall
we had to store some some mandatory and some custom data within our saga state
object. We can use any of that to tell NServiceBus
how to find the correct Saga. This is done is code as follows:
public override void ConfigureHowToFindSaga()
{
ConfigureMapping<CreatePurchaseOrderCommand>
(message => message.PurchaseOrderId)
.ToSaga(saga => saga.PurchaseOrderId);
ConfigureMapping<CheckPurchaseOrderStatusMessage>
(message => message.PurchaseOrderId)
.ToSaga(saga => saga.PurchaseOrderId);
}
Public Overrides Sub ConfigureHowToFindSaga()
ConfigureMapping(Of CreatePurchaseOrderCommand) _
(Function(message) message.PurchaseOrderId) _
.ToSaga(Function(saga) saga.PurchaseOrderId)
ConfigureMapping(Of CheckPurchaseOrderStatusMessage) _
(Function(message) message.PurchaseOrderId) _
.ToSaga(Function(saga) saga.PurchaseOrderId)
End Sub
It can be seen that I am using one of the custom state values I stored, namely "PurchaseOrderId". This is a unique value for my demo app. So that will easily find the correct Saga.
Jobs a goodun!!!!
If you want to store more complex saga data, where you may have many
interlinked tables that you really can't be bothered to flesh out, you may want
to switch to RavenDB saga storage, as it will allow the storage of arbitary
objects. Saying that even if you use a relational database such as SQL Server,
you do have choices. XML would be a good choice. In fact someone has done just
that and written a pretty good write up on this subject which you can find here:
http://www.make-awesome.com/2010/09/implementing-an-nservicebus-saga-persister/
I quite liked that linked article and felt it was well written, and very
useful if you need to use a relationa database.
Anyway that is all I wanted to say for now. I do plan to write a small
article on using Durandal (just so I can compare that experience to using
Knockout.js and Angular.js), but other than that I shall be mainly trying to
learn F# and blog about that from a beginners stand point. So if you enjoyed
this article, and feel like you would like to give it a vote/comment that would
be most welcome. Thanks for reading it. Cheerio