Introduction
This article shows a way of using Windows Azure storage - specifically the Queue and the Table storage - to implement the command side of a Command Query Responsibility Segregation architecture. It is implemented in VB.NET, but there is nothing in it preventing a very quick conversion to C# if that is your language of choice.
It does not cover the query side of the architecture nor the related concepts of Event Sourcing which I hope to get to as the application this code comes from is built out.
Background
At its simplest, CQRS is an architecture pattern that separates commands (doing) from queries (asking). What this means in practice is that the command and query side do not need to share a common model and this vertical shearing layer is often matched with a vertical shearing layer between the definition (of a command or query) and the implementation (in a command handler or a query handler).
In my experience, CQRS is very useful when you are developing an application with a distributed team to a very short timescale as it reduces the risk of "model contention" which can occur if everyone is updating the model at the same time. It is also useful for financial scenarios where having an audit trail of all the changes that have occurred in an application is required.
(If you are not familiar with CQRS, can I recommend you start with this excellent article for an overview of the how and why.)
Defining the Command
Every type of action you can perform is considered its own command and each command has a definition - covering both what the command is and what additional parameters are needed. You need a distinct class for each type of command your system uses which defined both what the command is, and what payload is needed in order to execute it.
For example, if you have a bank account based system, you might have a command OpenAccount
that has parameters that cover the data needed to open a bank account such as the account owner, holding branch, denomination currency and so on whereas a command DepositFunds
would require an account number and an amount. You could also have a command that does not require any parameters at all.
In order that a given class can be semantically identified as being a command, it derives from a standard interface ICommandDefinition
. This also adds two mandatory properties - a unique identifier by which every instance of a command can be identified and a human-readable name by which the command is known.
(Many implementations do not use a human readable command name - you can use it or not.)
Public Interface ICommandDefinition
ReadOnly Property InstanceIdentifier As Guid
ReadOnly Property CommandName As String
End Interface
At this point. we can go ahead and define concrete classes for each command - for example, our OpenAccount
command definition could look like:
Public Class OpenAccountCommandDefinition
Implements ICommandDefinition
Private m_instanceid As Guid = Guid.NewGuid
Public ReadOnly Property InstanceIdentifier As _
Guid Implements ICommandDefinition.InstanceIdentifier
Get
Return m_instanceid
End Get
End Property
Public Overrides ReadOnly Property CommandName As String
Get
Return "Open a new account"
End Get
End Property
Public Property BranchIdentifier As String
Public Property AccountOwner As String
Public Property DenominationCurrency As String
End Class
However, in order to pass the command definition across to the command handler, I am using Windows Azure storage - specifically a queue and a matching table. The queue has the command type and unique identifier appended on to it and the matching table is used to hold any additional payload that the command requires.
The maximum size of a message on an Azure queue is 64KB but it is possible that a command would require more payload than that so this dual approach is used.
Dispatching the Command
The first step in putting the payload in an Azure table is to identify the payload properties of the command to save them off. This can either be done using reflection or by having a class that represents a command parameter and using a dictionary of them to back the payload properties.
When a command definition has been created, it then needs to be dispatched. The command dispatcher is responsible for the transfer to the command handlers. In the case of the Azure dispatcher, this is a two step process - first save the payload to a commands
table, then add a message to the queue. The (partial) code snippet to do this is:
Dim applicationStorageAccount As CloudStorageAccount
Dim applicationQueueClient As CloudQueueClient
Dim applicationTableClient As CloudTableClient
Public Sub Send(command As ICommandDefinition) Implements ICommandDispatcher.Send
If (applicationTableClient IsNot Nothing) Then
Dim commandTable As CloudTable = _
applicationTableClient.GetTableReference("commandsparameterstable")
commandTable.CreateIfNotExists()
Dim cmdRecord As New CommandTableEntity(command)
Dim insertOperation As TableOperation = _
TableOperation.InsertOrReplace(cmdRecord)
Dim insertResult = commandTable.Execute(insertOperation)
End If
If (applicationQueueClient IsNot Nothing) Then
Dim queue As CloudQueue = applicationQueueClient.GetQueueReference_
("commandsqueue")
queue.CreateIfNotExists()
Dim msg As CloudQueueMessage = New CloudQueueMessage_
(command.GetType.Name & "::" & command.InstanceIdentifier.ToString())
If (msg IsNot Nothing) Then
queue.AddMessage(msg)
End If
End If
End Sub
It is important to follow the naming standards for queues and tables - all lower case and no punctuation.
In order to save a command record to an Azure table, it must either be put in a class that inherits from TableEntity
or, if you are doing manipulations behind the scenes, then you would need a class that inherits from ITableEntity
.
I wasn't able to find many examples of inheriting from ITableEntity
so mine is as follows:
Public Class CommandTableEntity
Implements ITableEntity
Private m_parameters As Dictionary(Of String, CommandParameter) = _
New Dictionary(Of String, CommandParameter)
Public Property ETag As String Implements ITableEntity.ETag
Public Property PartitionKey As String Implements ITableEntity.PartitionKey
Public Property RowKey As String Implements ITableEntity.RowKey
Public Property Timestamp As DateTimeOffset Implements ITableEntity.Timestamp
Public Property Processed As Boolean
Public Property InError As Boolean
Public Property ProcessedBy As String
Public Sub ReadEntity(properties As IDictionary(Of String, EntityProperty), _
operationContext As OperationContext) Implements ITableEntity.ReadEntity
For Each propertyPair In properties
If (propertyPair.Key.Contains("__")) Then
Dim fixedPropertyName As String = _
propertyPair.Key.Replace("__", "[").Trim() & "]"
Dim parameterName As String = _
CommandParameter.GetParameterName(fixedPropertyName)
Dim parameterIndex As Integer = _
CommandParameter.GetParameterIndex(fixedPropertyName)
Dim parameterPayload As String = propertyPair.Value.StringValue
If (Not String.IsNullOrWhiteSpace(parameterPayload)) Then
m_parameters.Add(propertyPair.Key, _
CommandParameter.Create(parameterPayload))
Else
m_parameters.Add(propertyPair.Key, _
CommandParameter.Create(parameterName, parameterIndex))
End If
Else
"Timestamp", "Rowkey" or "PartitionKey"
If propertyPair.Key.Equals_
("Processed", StringComparison.OrdinalIgnoreCase) Then
If (propertyPair.Value.BooleanValue.HasValue) Then
Me.Processed = propertyPair.Value.BooleanValue.Value
End If
End If
If propertyPair.Key.Equals("InError", _
StringComparison.OrdinalIgnoreCase) Then
If (propertyPair.Value.BooleanValue.HasValue) Then
Me.InError = propertyPair.Value.BooleanValue.Value
End If
End If
If propertyPair.Key.Equals("ProcessedBy", _
StringComparison.OrdinalIgnoreCase) Then
Me.ProcessedBy = propertyPair.Value.StringValue
End If
End If
Next
End Sub
Public Function WriteEntity(operationContext As OperationContext) _
As IDictionary(Of String, EntityProperty) Implements ITableEntity.WriteEntity
Dim properties As New Dictionary(Of String, EntityProperty)
"Timestamp", "Rowkey" or "PartitionKey"
For Each param As CommandParameter In m_parameters.Values
Dim fixedPropertyName As String = _
CommandParameter.GetParameterKey(param).Replace("[", "__").Replace("]", "")
properties.Add(fixedPropertyName, New EntityProperty(param.ToString))
Next
properties.Add("Processed", New EntityProperty(Me.Processed))
properties.Add("InError", New EntityProperty(Me.InError))
properties.Add("ProcessedBy", New EntityProperty(Me.ProcessedBy))
Return properties
End Function
End Class
Next step - getting the command off the queue and processing it - for the next article...
Querying the Command
As we have given each command instance its own unique identifier in InstanceIdentifier
, we can create a user interface component to query the state of the command queue and table so as to perform a "How is my command getting on" type of query.
This solves one of the biggest areas of confusion in CQRS - how does the user get feedback that their command has executed if it is totally asynchronous. The solution is for them to ask - either by explicit user interaction or by some polling mechanism.
This also means that any systems that don't require status feedback (such as automated sensor feeds or data pipelines) can use exactly the same command handling functions as are used by an interactive front end.
To do this, you need a unique identifier for each command - I use a GUID as that seems to make most sense - and a series of defined events that can occur to the command. For example Created, Step Processed, Completed, Transient Failure, Fatal Error and so on.
Then as your command handler is processing the command, it simply needs to append the relevant events to that event stream as it goes. The status of any given command can therefore be derived by running a very simple projection over that event stream and this status query can be done totally asynchronously to the actual processing of the command.
That same status projection can be used by the command handler side to decide what command(s) need to be processed next. Again, by having the command handler post a message to indicate that processing has started to the command's event stream (and by not caching the projection) you can ensure a reliable command dequeuing process regardless of how many instances of a command handler are spun up.
You can choose whether to have a persistent backing store (for example AzureBlob) or use an in-memory event stream implementation dependent on your business needs in terms of logging commands and in terms of recovery from failure. You can also copy the command event stream into a development or test environment and replay the events should you want to perform some debugging or analysis on it.
Points of Interest
If you are doing any work with Windows Azure Storage, I recommend downloading the Azure Storage Explorer project from CodePlex.
Tips I have learnt so far:
- When storing data in Windows Azure table storage, put extra care into deciding what the partition key property is - I started out with too high level an object (client) which meant my tables only had a small number of partitions which makes data access slower.
- It costs a (very small) amount of money to poll a windows storage queue even if it is empty, therefore a way of throttling the polling rate up or down depending on demand is a very good idea.
History
- v1 - 19th January, 2014 Outline of the classes involved
- v2 - 22nd January, 2014 Rewritten to emphasise why over how - removed the code dump aspect
- v3 - 5th March, 2016-03-05 Added thoughts on querying the command