Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

CQRS on Windows Azure - The Command Side

0.00/5 (No votes)
4 Mar 2017 1  
An outline of how a paired Azure queue and Azure table can be used for the command side of a CQRS application

Introduction

This article shows a way of using Windows Azure storage - specifically the Queue and the Table storage - to implement the command side of a Command Query Responsibility Segregation architecture. It is implemented in VB.NET, but there is nothing in it preventing a very quick conversion to C# if that is your language of choice.
It does not cover the query side of the architecture nor the related concepts of Event Sourcing which I hope to get to as the application this code comes from is built out.

Background

At its simplest, CQRS is an architecture pattern that separates commands (doing) from queries (asking). What this means in practice is that the command and query side do not need to share a common model and this vertical shearing layer is often matched with a vertical shearing layer between the definition (of a command or query) and the implementation (in a command handler or a query handler).

In my experience, CQRS is very useful when you are developing an application with a distributed team to a very short timescale as it reduces the risk of "model contention" which can occur if everyone is updating the model at the same time. It is also useful for financial scenarios where having an audit trail of all the changes that have occurred in an application is required.

Image 1

(If you are not familiar with CQRS, can I recommend you start with this excellent article for an overview of the how and why.)

Defining the Command

Every type of action you can perform is considered its own command and each command has a definition - covering both what the command is and what additional parameters are needed. You need a distinct class for each type of command your system uses which defined both what the command is, and what payload is needed in order to execute it.

For example, if you have a bank account based system, you might have a command OpenAccount that has parameters that cover the data needed to open a bank account such as the account owner, holding branch, denomination currency and so on whereas a command DepositFunds would require an account number and an amount. You could also have a command that does not require any parameters at all.

In order that a given class can be semantically identified as being a command, it derives from a standard interface ICommandDefinition. This also adds two mandatory properties - a unique identifier by which every instance of a command can be identified and a human-readable name by which the command is known.
(Many implementations do not use a human readable command name - you can use it or not.)

''' <summary>
''' Interface defining what is required of a class that defines a command to be executed
''' </summary>
''' <remarks>
''' Commands can have named parameters that have meaning to the command handler
''' </remarks>
Public Interface ICommandDefinition

    ''' <summary>
    ''' Unique identifier of this command instance
    ''' </summary>
    ''' <remarks>
    ''' This allows commands to be queued and the handler to know 
    ''' if a command has been executed or not (for example as an Event Source)
    ''' </remarks>
    ReadOnly Property InstanceIdentifier As Guid

    ''' <summary>
    ''' The unique name of the command being executed
    ''' </summary>
    ReadOnly Property CommandName As String

End Interface

At this point. we can go ahead and define concrete classes for each command - for example, our OpenAccount command definition could look like:

Public Class OpenAccountCommandDefinition
    Implements ICommandDefinition

    Private m_instanceid As Guid = Guid.NewGuid

    Public ReadOnly Property InstanceIdentifier As _
           Guid Implements ICommandDefinition.InstanceIdentifier
        Get
            Return m_instanceid
        End Get
    End Property

    Public Overrides ReadOnly Property CommandName As String
       Get
          Return "Open a new account"
       End Get
    End Property

    'Payload for this command
    Public Property BranchIdentifier As String

    Public Property AccountOwner As String

    Public Property DenominationCurrency As String

End Class

However, in order to pass the command definition across to the command handler, I am using Windows Azure storage - specifically a queue and a matching table. The queue has the command type and unique identifier appended on to it and the matching table is used to hold any additional payload that the command requires.
The maximum size of a message on an Azure queue is 64KB but it is possible that a command would require more payload than that so this dual approach is used.

Dispatching the Command

The first step in putting the payload in an Azure table is to identify the payload properties of the command to save them off. This can either be done using reflection or by having a class that represents a command parameter and using a dictionary of them to back the payload properties.

When a command definition has been created, it then needs to be dispatched. The command dispatcher is responsible for the transfer to the command handlers. In the case of the Azure dispatcher, this is a two step process - first save the payload to a commands table, then add a message to the queue. The (partial) code snippet to do this is:

Dim applicationStorageAccount As CloudStorageAccount
Dim applicationQueueClient As CloudQueueClient
Dim applicationTableClient As CloudTableClient

Public Sub Send(command As ICommandDefinition) Implements ICommandDispatcher.Send

    ' 1) Save the command data in a commands table for the handler to use
    If (applicationTableClient IsNot Nothing) Then
        Dim commandTable As CloudTable = _
            applicationTableClient.GetTableReference("commandsparameterstable")
        'Table may not exists yet, so create it if it doesn't
        commandTable.CreateIfNotExists()
        Dim cmdRecord As New CommandTableEntity(command)
        '\\ Insert or update the record
        Dim insertOperation As TableOperation = _
                            TableOperation.InsertOrReplace(cmdRecord)
        Dim insertResult = commandTable.Execute(insertOperation)
    End If

    ' 2) Queue the command to execute
    If (applicationQueueClient IsNot Nothing) Then
        Dim queue As CloudQueue = applicationQueueClient.GetQueueReference_
                                    ("commandsqueue")
        ' Queue may not yet exist so create it is it doesn't
        queue.CreateIfNotExists()
        Dim msg As CloudQueueMessage = New CloudQueueMessage_
           (command.GetType.Name & "::" & command.InstanceIdentifier.ToString())
        If (msg IsNot Nothing) Then
            queue.AddMessage(msg)
        End If
    End If

End Sub

It is important to follow the naming standards for queues and tables - all lower case and no punctuation.

In order to save a command record to an Azure table, it must either be put in a class that inherits from TableEntity or, if you are doing manipulations behind the scenes, then you would need a class that inherits from ITableEntity.
I wasn't able to find many examples of inheriting from ITableEntity so mine is as follows:

''' <summary>
 ''' A class for storing a command and its parameters in an azure table store
 ''' </summary>
 ''' <remarks>
 ''' The command type and the instance identifier are the partition
 ''' and row keys to allow any
 ''' command handler easily to find its command parameter data
 ''' </remarks>
 Public Class CommandTableEntity
     Implements ITableEntity

     Private m_parameters As Dictionary(Of String, CommandParameter) = _
                                  New Dictionary(Of String, CommandParameter)

     ''' <remarks>
     ''' Set this value to '*' in order to blindly overwrite an entity
     ''' as part of an update operation.
     ''' </remarks>
     Public Property ETag As String Implements ITableEntity.ETag

     ''' <summary>
     ''' The property that defines what table partition this command will be stored in
     ''' </summary>
     Public Property PartitionKey As String Implements ITableEntity.PartitionKey

     ''' <summary>
     ''' The property that defines what row this command record will be stored in
     ''' </summary>
     Public Property RowKey As String Implements ITableEntity.RowKey

     Public Property Timestamp As DateTimeOffset Implements ITableEntity.Timestamp

     ''' <summary>
     ''' Has this command been marked as processed
     ''' </summary>
     Public Property Processed As Boolean

     ''' <summary>
     ''' Has this command been marked as in error
     ''' </summary>
     Public Property InError As Boolean

     ''' <summary>
     ''' What handler processed this command
     ''' </summary>
     Public Property ProcessedBy As String

     Public Sub ReadEntity(properties As IDictionary(Of String, EntityProperty), _
       operationContext As OperationContext) Implements ITableEntity.ReadEntity

         For Each propertyPair In properties
             If (propertyPair.Key.Contains("__")) Then
                 'This is a property in the form name[index]...
                 Dim fixedPropertyName As String = _
                           propertyPair.Key.Replace("__", "[").Trim() & "]"
                 Dim parameterName As String = _
                           CommandParameter.GetParameterName(fixedPropertyName)
                 Dim parameterIndex As Integer = _
                           CommandParameter.GetParameterIndex(fixedPropertyName)
                 'Get the command payload from the string
                 Dim parameterPayload As String = propertyPair.Value.StringValue
                 If (Not String.IsNullOrWhiteSpace(parameterPayload)) Then
                     m_parameters.Add(propertyPair.Key, _
                           CommandParameter.Create(parameterPayload))
                 Else
                     '\\ Add a parameter that has no payload/value
                     m_parameters.Add(propertyPair.Key, _
                       CommandParameter.Create(parameterName, parameterIndex))
                 End If
             Else
                 'Named property only...do not persist "ETag", _
                            "Timestamp", "Rowkey" or "PartitionKey"

                 ' Additional properties for a command
                 If propertyPair.Key.Equals_
                     ("Processed", StringComparison.OrdinalIgnoreCase) Then
                     If (propertyPair.Value.BooleanValue.HasValue) Then
                         Me.Processed = propertyPair.Value.BooleanValue.Value
                     End If
                 End If

                 If propertyPair.Key.Equals("InError", _
                     StringComparison.OrdinalIgnoreCase) Then
                     If (propertyPair.Value.BooleanValue.HasValue) Then
                         Me.InError = propertyPair.Value.BooleanValue.Value
                     End If
                 End If

                 If propertyPair.Key.Equals("ProcessedBy", _
                     StringComparison.OrdinalIgnoreCase) Then
                     Me.ProcessedBy = propertyPair.Value.StringValue
                 End If

             End If
         Next

     End Sub

     Public Function WriteEntity(operationContext As OperationContext) _
      As IDictionary(Of String, EntityProperty) Implements ITableEntity.WriteEntity

         Dim properties As New Dictionary(Of String, EntityProperty)
         'Skip the ITableEntity properties "ETag", _
         "Timestamp", "Rowkey" or "PartitionKey"

         'Add all the command parameters...
         For Each param As CommandParameter In m_parameters.Values
             Dim fixedPropertyName As String = _
             CommandParameter.GetParameterKey(param).Replace("[", "__").Replace("]", "")
             properties.Add(fixedPropertyName, New EntityProperty(param.ToString))
         Next

         'Add the other properties
         properties.Add("Processed", New EntityProperty(Me.Processed))
         properties.Add("InError", New EntityProperty(Me.InError))
         properties.Add("ProcessedBy", New EntityProperty(Me.ProcessedBy))

         Return properties

     End Function
 End Class

Next step - getting the command off the queue and processing it - for the next article...

Querying the Command

As we have given each command instance its own unique identifier in InstanceIdentifier, we can create a user interface component to query the state of the command queue and table so as to perform a "How is my command getting on" type of query.

This solves one of the biggest areas of confusion in CQRS - how does the user get feedback that their command has executed if it is totally asynchronous. The solution is for them to ask - either by explicit user interaction or by some polling mechanism.

This also means that any systems that don't require status feedback (such as automated sensor feeds or data pipelines) can use exactly the same command handling functions as are used by an interactive front end.

To do this, you need a unique identifier for each command - I use a GUID as that seems to make most sense - and a series of defined events that can occur to the command. For example Created, Step Processed, Completed, Transient Failure, Fatal Error and so on.

Then as your command handler is processing the command, it simply needs to append the relevant events to that event stream as it goes. The status of any given command can therefore be derived by running a very simple projection over that event stream and this status query can be done totally asynchronously to the actual processing of the command.

That same status projection can be used by the command handler side to decide what command(s) need to be processed next. Again, by having the command handler post a message to indicate that processing has started to the command's event stream (and by not caching the projection) you can ensure a reliable command dequeuing process regardless of how many instances of a command handler are spun up.

You can choose whether to have a persistent backing store (for example AzureBlob) or use an in-memory event stream implementation dependent on your business needs in terms of logging commands and in terms of recovery from failure. You can also copy the command event stream into a development or test environment and replay the events should you want to perform some debugging or analysis on it.

Points of Interest

If you are doing any work with Windows Azure Storage, I recommend downloading the Azure Storage Explorer project from CodePlex.

Tips I have learnt so far:

  1. When storing data in Windows Azure table storage, put extra care into deciding what the partition key property is - I started out with too high level an object (client) which meant my tables only had a small number of partitions which makes data access slower.
  2. It costs a (very small) amount of money to poll a windows storage queue even if it is empty, therefore a way of throttling the polling rate up or down depending on demand is a very good idea.

History

  • v1 - 19th January, 2014 Outline of the classes involved
  • v2 - 22nd January, 2014 Rewritten to emphasise why over how - removed the code dump aspect
  • v3 - 5th March, 2016-03-05 Added thoughts on querying the command

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here