Introduction
Although event sourcing is not a mandatory part of CQRS, and indeed event sourcing is used outside of CQRS, the two are often used together. In this article, I will be looking at an implementation that uses Windows Azure Storage (either Tables, Blobs or Files) as the persistence mechanism.
Event Sourcing is particularly suited to business domains where the read and write side of the data storage need to scale independently and where there is not very much across entity restriction built in to the business rules. It can also be used as a partial architecture with those parts of the system more suited to a model based (database or NoSQL) approach using that technology.
However, the biggest use case for Event Sourcing is that it allows you to view a version control like history of your data over its entire lifetime. This allows for a very powerful debugging experience with regard to recreating any production issues in that the data state can be stepped through in exactly the same way as code is stepped through in a model based system.
Getting Your Head Around Event Sourcing
If you have 45 minutes, I recommend the video of this material:
For developers schooled in the relational database model, event sourcing can seem to be a very confusing way of doing things. It reverses the existing way of storing data in that instead of storing the current state of objects and updating the state when events occur, we store the entire history of events that have occurred to an object and use this to derive the current state of the object.
Hopefully, the following hints can help clarify:
- Events only get added to the end of the event list (you can conceptualise this as a stack that doesn't have a pop option).
- Events are stored by the thing they occur to rather than the type of event that the are. For example, we don't have a separate table for "payments" and "standing orders" in a bank account type of system - these are just different events that occur in the life of the bank account.
- Events cannot be deleted nor can their content be modified - if you need to "undo" something that has happened, a reversal or undo event needs to be added to the event store.
- Events are described in the past-tense.
However, if you get your head out of the CRUD mindset, you will see some benefits:
- You automatically get a complete audit trail for everything that occurred (If you have fields like "Last Modified", "Updated By" in your database tables and a set of triggers that write to an audit table whenever a field is updated in the database, you are already doing this and doing it backwards).
- You can query the event history in ways that weren't anticipated when the system was first created.
- Your event payload schema can be very flexible - if a particular attribute doesn't apply to a particular type of event, you don't need to store a "
null
" there. - You can store your event streams separately which allows for a highly scalable architecture as your system can scale across machines and even data centres.
Figure 1: Example of an event store for vehicles, using the vehicle registration number as the aggregation identifier
Aside - a Quick Glossary of Terms
There are a couple of words that can cause confusion when dealing with event sources: Aggregation and Sequence. My (perhaps over-simplistic) definition is as follows:
The Aggregation is the thing to which events occur. In a bank example, this could be something like a bank account. In a vehicle leasing company, it could be the vehicle, and so on.
Each aggregation must be uniquely identified. Often, a business domain already has unique identifiers you can use but if that is not the case, you can create them.
The Sequence is the order in which the events occurred - this is almost always implemented as an incremental number (although in the case of a file based stream, I use a file pointer for the start of the event).
The events that have occurred to any aggregation instance are stored in a sequential storage referred to as an Event Stream (or alternatively as a Fact Log) which is very analogous to a version controlled view of the data about that instance. In exactly the same way as version control allows you to view the content of a source code file as at any given point in time so the event stream can be used to recreate the state of an aggregate instance at any given point in time too.
Architecture
The above whiteboard overview shows where event streams would fit in a CQRS architecture. Within Azure storage, there are a number of different ways in which you could store the events underlying the event stream - in my case, I have provided implementation for four of these: SQL, Table, File or AppendBlob. The implementation is different depending on which of these underlying technologies you choose to use.
1. Using Azure Storage "Tables" to Persist Events
An event can be represented by a CLR class. It is usual to have an empty IEvent
interface to indicate the intent of the developer that a given class is an event.
When storing the event, we need to add the aggregation identifier and sequence number - so these two are specified in an interface for storing events:
Public Interface IEventIdentity
Function GetAggregateIdentifier() As String
ReadOnly Property Version As UInteger
ReadOnly Property EventInstance As IEvent
End Interface
In this case, the aggregate identifier is implemented as a string
so that the business can dictate what actual unique identifier to use to for it. For clarity, I also added an interface that can be used to set these aggregate identifiers, but this is totally optional.
Public Interface IAggregateIdentity
Function GetAggregateIdentifier() As String
End Interface
Turning a Version Into a Row Identifier
Since the version is an incremental number and the Azure table takes a string
for its row key, you need to pad the version numbers with zero so as to store it in a manner that it will sort correctly.
Private Const VERSION_FORMAT As String = "0000000000000000000"
Public Shared Function VersionToRowkey(ByVal version As Long) As String
If (version <= 0) Then
Return Long.MaxValue.ToString(VERSION_FORMAT)
Else
Return (Long.MaxValue - version).ToString(VERSION_FORMAT)
End If
End Function
Saving an Event Record
The event record itself (everything except the partition key and row key) can be any kind of .NET class that inherits IEventIdentity
. Because the fields this record can have are dynamic (depending on the event type - recall from above that different event types are stored in the same event store) we have to use a DynamicTableEntity
class and fill it with the properties of our event class passed in:
Public Shared Function MakeDynamicTableEntity(ByVal eventToSave As IEventContext)
As DynamicTableEntity
Dim ret As New DynamicTableEntity
ret.PartitionKey = eventToSave.GetAggregateIdentifier()
ret.RowKey = VersionToRowkey(eventToSave.Version)
ret.Properties.Add("EventType",
New EntityProperty(eventToSave.EventInstance.GetType().Name))
If (eventToSave.SequenceNumber <= 0) Then
ret.Properties.Add("SequenceNumber",
New EntityProperty(DateTime.UtcNow.Ticks))
Else
ret.Properties.Add("SequenceNumber",
New EntityProperty(eventToSave.SequenceNumber))
End If
If (Not String.IsNullOrWhiteSpace(eventToSave.Commentary)) Then
ret.Properties.Add("Commentary",
New EntityProperty(eventToSave.Commentary))
End If
If (Not String.IsNullOrWhiteSpace(eventToSave.Who)) Then
ret.Properties.Add("Who", New EntityProperty(eventToSave.Who))
End If
If (Not String.IsNullOrWhiteSpace(eventToSave.Source)) Then
ret.Properties.Add("Source", New EntityProperty(eventToSave.Source))
End If
For Each pi As System.Reflection.PropertyInfo In _
eventToSave.EventInstance.GetType().GetProperties()
If (pi.CanRead) Then
ret.Properties.Add(pi.Name, MakeEntityProperty(pi, eventToSave.EventInstance))
End If
Next pi
End Function
Then turning the DynamicTableEntity
back into an appropriate event class is a matter of reading the event type, creating an instance of that type and then populating its properties from the DynamicTableEntity.Properties
collection by reflection.
Points of Interest
When using Windows Azure Table Storage, queries that use the partition key and row identifier are very fast. Those that don't are much slower - therefore mapping these two fields to the aggregate identifier and sequence number is a very sensible way to start.
2. Using Azure Storage "Append Blob" to Persist Events
An append blob is a new, special type of binary large object store in Windows Azure storage which is optimized such that you can only add data onto the end of it.
The append blob has a maximum size of 195Mb (or 50,000 events) so the usual setup is to create one blob per unique event stream. This also allows for a very high degree of prallelism.
The additional metadata needed for each event stream (such as the record count) can then be stored using the file metadata for the Azure blob:
Protected Const METATDATA_DOMAIN As String = "DOMAIN"
Protected Const METADATA_AGGREGATE_CLASS As String = "AGGREGATECLASS"
Protected Const METADATA_SEQUENCE As String = "SEQUENCE"
Protected Const METADATA_RECORD_COUNT As String = "RECORDCOUNT"
Protected Const METADATA_AGGREGATE_KEY As String = "AGGREGATEKEY"
The blob itself is created when the event stream reader or writer class is instantiated:
Protected Sub New(ByVal AggregateDomainName As String, ByVal AggregateKey As TAggregationKey)
MyBase.New(AggregateDomainName)
m_key = AggregateKey
If (BlobContainer IsNot Nothing) Then
m_blob = BlobContainer.GetAppendBlobReference(EventStreamBlobFilename)
If Not m_blob.Exists() Then
m_blob.CreateOrReplace()
m_blob.Metadata(METATDATA_DOMAIN) = DomainName
m_blob.Metadata(METADATA_AGGREGATE_CLASS) = GetType(TAggregate).Name
m_blob.Metadata(METADATA_AGGREGATE_KEY) = m_key.ToString()
m_blob.Metadata(METADATA_SEQUENCE) = "0"
m_blob.Metadata(METADATA_RECORD_COUNT) = "0"
m_blob.SetMetadata()
Else
m_blob.FetchAttributes()
End If
End If
End Sub
This is then used to append an event as a two-step process: first wrap the raw event in a class that indicates the event type, sequence number and other event level metadata and then write the whole to the end of the append blob.
Public Sub AppendEvent(EventInstance As IEvent) _
Implements IEventStreamWriter(Of TAggregate, TAggregationKey).AppendEvent
If (AppendBlob IsNot Nothing) Then
Dim nextSequence As Long = IncrementSequence()
Dim evtToWrite As New BlobBlockWrappedEvent(nextSequence, 0, EventInstance)
Dim recordWritten As Boolean = False
Try
Using es As System.IO.Stream = evtToWrite.ToBinaryStream()
Dim offset As Long = AppendBlob.AppendBlock(es)
End Using
recordWritten = True
Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
Throw New EventStreamWriteException_
(DomainName, AggregateClassName, Key.ToString(), _
nextSequence, "Unable to save a record to the event stream - " _
& evtToWrite.EventName, exBlob)
End Try
If (recordWritten) Then
IncrementRecordCount()
End If
End If
End Sub
To read events, we simply open a snapshot of the append blob as a stream and deserialise wrapped events from that stream:
Private Function GetAppendBlobSnapshot() As CloudAppendBlob
If (AppendBlob IsNot Nothing) Then
Return AppendBlob.CreateSnapshot()
Else
Return Nothing
End If
End Function
Private Function GetUnderlyingStream() As System.IO.Stream
If (AppendBlob IsNot Nothing) Then
Dim targetStream As New System.IO.MemoryStream()
Try
GetAppendBlobSnapshot().DownloadToStream(targetStream)
Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
Throw New EventStreamReadException(DomainName, AggregateClassName, m_key.ToString(),
0, "Unable to access underlying event stream", exBlob)
End Try
targetStream.Seek(0, IO.SeekOrigin.Begin)
Return targetStream
Else
Return Nothing
End If
End Function
Public Function GetEvents() As IEnumerable(Of IEvent)
Implements IEventStreamReader(Of TAggregate, TAggregationKey).GetEvents
If (AppendBlob IsNot Nothing) Then
Dim ret As New List(Of IEvent)
Dim bf As New BinaryFormatter()
Using rawStream As System.IO.Stream = GetUnderlyingStream()
While Not (rawStream.Position >= rawStream.Length)
Dim record As BlobBlockWrappedEvent = _
CTypeDynamic(Of BlobBlockWrappedEvent)(bf.Deserialize(rawStream))
If (record IsNot Nothing) Then
ret.Add(record.EventInstance)
End If
End While
End Using
Return ret
Else
Throw New EventStreamReadException_
(DomainName, AggregateClassName, MyBase.m_key.ToString(),
0, "Unable to read events - Azure blob not initialized")
End If
End Function
3. Using Azure Storage "Files" to Persist Events
The implementation that uses Azure files is quite similar to that which uses append blobs except that it pre-allocates the full size of file the first time it is used and thereafter just fills this in as events are added. In addition, it co-opts the file pointer of the start of the event record to be the sequence number of that event.
The record count, aggregate type, aggregate key and current sequence number are also stored as attributes in each event stream file.
<Serializable()>
<DataContract()>
Public Class FileBlockWrappedEvent
<DataMember(Name:="EventName", Order:=0)>
Private ReadOnly m_eventName As String
Public ReadOnly Property EventName As String
Get
Return m_eventName
End Get
End Property
<DataMember(Name:="Sequence", Order:=1)>
Private ReadOnly m_sequence As Long
Public ReadOnly Property Sequence As Long
Get
Return m_sequence
End Get
End Property
<DataMember(Name:="Version", Order:=2)>
Private ReadOnly m_version As UInteger
Public ReadOnly Property Version As UInteger
Get
Return m_version
End Get
End Property
<DataMember(Name:="Timestamp", Order:=3)>
Private ReadOnly m_timestamp As DateTime
Public ReadOnly Property Timestamp As DateTime
Get
Return m_timestamp
End Get
End Property
<DataMember(Name:="Size", Order:=4)>
Private ReadOnly m_eventSize As UInteger
Public ReadOnly Property EventSize As UInteger
Get
Return m_eventSize
End Get
End Property
<DataMember(Name:="Class", Order:=4)>
Private ReadOnly m_eventClassName As String
Public ReadOnly Property ClassName As String
Get
Return m_eventClassName
End Get
End Property
<DataMember(Name:="Data", Order:=5)>
Private ReadOnly m_eventData As Byte()
Public ReadOnly Property EventInstance As IEvent
Get
If (String.IsNullOrWhiteSpace(m_eventClassName)) Then
Throw New SerializationException
("Unable to determine the event type that wrote this event instance")
End If
If (m_eventSize = 0) Then
Throw New SerializationException_
("Unable to return the event data for this event instance - size is zero")
End If
Dim evtType As Type = Type.GetType(m_eventClassName, True, True)
If (evtType IsNot Nothing) Then
Dim bf As New BinaryFormatter()
Using memStream As New System.IO.MemoryStream(m_eventData)
Return CTypeDynamic(bf.Deserialize(memStream), evtType)
End Using
End If
Return Nothing
End Get
End Property
Public Sub New(ByVal sequenceInit As String,
ByVal versionInit As UInteger,
ByVal timestampInit As DateTime,
ByVal eventInstanceInit As IEvent)
m_eventName = EventNameAttribute.GetEventName(eventInstanceInit)
m_sequence = sequenceInit
m_version = versionInit
m_timestamp = timestampInit
Dim bf As New BinaryFormatter()
Using memStream As New System.IO.MemoryStream()
bf.Serialize(memStream, eventInstanceInit)
m_eventSize = memStream.Length
m_eventData = memStream.ToArray()
End Using
m_eventClassName = eventInstanceInit.GetType().AssemblyQualifiedName
End Sub
Public Sub WriteToBinaryStream(ByVal stream As System.IO.Stream)
Dim bf As New BinaryFormatter()
bf.Serialize(stream, Me)
End Sub
End Class
4. Using Local Files to Store Events
If you want to perform unit testing on an event sourcing system or want to set up a trial project to get your head around the concepts before committing to use the technique in a full scale project, you can use files on your local machine to store the event streams.
To do this, it uses the aggregate identifier to create an unique filename and appends events to each file accordingly. There is an "information record" at the start of the file that indicates what domain and full class name the event stream pertains to:
Private Sub AppendEventInternal(EventInstance As IEvent(Of TAggregate))
If (MyBase.m_file IsNot Nothing) Then
Dim evtToWrite As New LocalFileWrappedEvent_
(m_eventStreamDetailBlock.SequenceNumber,
EventInstance.Version,
DateTime.UtcNow,
EventInstance, m_setings.UnderlyingSerialiser)
If (evtToWrite IsNot Nothing) Then
Using fs = m_file.OpenWrite()
fs.Seek(0, IO.SeekOrigin.End)
evtToWrite.WriteToBinaryStream(fs)
m_eventStreamDetailBlock.SequenceNumber = fs.Position
End Using
m_eventStreamDetailBlock.RecordCount += 1
End If
End If
End Sub
5. Using Local Memory to Store Events
For unit testing or for scenarios where the whole event stream can be stored in the local memory of a machine (for example, when an undo-redo buffer is implemented using this technology).
Choosing the Storage Type to Use
In my experience, the choice of storage technology depends on the specifics of the system you are building, but I would recommend using Azure Tables (or even SQL on Azure) if you want to be able to look into the event streams but use Append Blobs or Files when you need the maximum performance and horizontal scaling.
In particular, if your event streams are likely to have a higher write rate (for example, in any IoT instrumentation scenario, a multi player game or a trading platform) then the AppendBlob scales well and is very very fast.
In order to switch between these without major rewriting, I have allowed for configuration by specific configuration settings (I think there is still some work to do on this however) to map the aggregate class to the backing technology used to store its event streams and projection:
<CQRSAzureEventSourcingConfiguration>
<ImplementationMaps>
<Map AggregateDomainQualifiedName="HospitalWard.Nurse"
ImplementationName="InMemoryImplementationExample"
SnapshotSettingsName="InMemorySnapshotExample" />
</ImplementationMaps>
<Implementations>
<Implementation Name="InMemoryImplementationExample" ImplementationType="InMemory">
<InMemorySettings />
</Implementation>
<Implementation Name="AzureBlobImplementationExample" ImplementationType="AzureBlob">
<BlobSettings ConnectionStringName="UnitTestStorageConnectionString" />
</Implementation>
<Implementation Name="AzureBlobImplementationDomainExample"
ImplementationType="AzureBlob">
<BlobSettings ConnectionStringName=
"UnitTestStorageConnectionString" DomainName="Test" />
</Implementation>
<Implementation Name="AzureFileImplementationExample" ImplementationType="AzureFile">
<FileSettings ConnectionStringName="UnitTestStorageConnectionString"
InitialSize="20000" />
</Implementation>
<Implementation Name="AzureSQLImplementationExample" ImplementationType="AzureSQL">
<SQLSettings ConnectionStringName="UnitTestStorageConnectionString"
AggregateIdentifierField="AggregateKey" />
</Implementation>
<Implementation Name="AzureTableImplementationExample" ImplementationType="AzureTable">
<TableSettings ConnectionStringName="UnitTestStorageConnectionString"
SequenceNumberFormat="00000000" />
</Implementation>
<Implementation Name="LocalFileSettingsExample" ImplementationType="LocalFileSettings">
<LocalFileSettings EventStreamRootFolder="C:\CQRS\Data\EventStreams"
UnderlyingSerialiser="JSON"/>
</Implementation>
</Implementations>
<SnapshotSettings>
<SnapshotSetting Name="InMemorySnapshotExample" ImplementationType="InMemory">
<InMemorySettings />
</SnapshotSetting>
</SnapshotSettings>
</CQRSAzureEventSourcingConfiguration>
Consuming Events and Projections
In order to turn your event stream into something interesting (at least, interesting to a user that wants to query the data), you need to create a projection. A projection is a view of the effect of a set of events. For example, a financial projection on the above cars event example would be interested in any event that impacted the cost or profit from any given car.
To consume events, you need to create a class that "knows" what kind of events it deals with and what to do with them. These specific projections can be run over a single aggregate's event stream in order to perform some calculation or operation based on the underlying data of that event stream.
The underlying interface for any projection is:
Public Interface IProjection(Of TAggregate As IAggregationIdentifier, TAggregateKey)
Inherits IProjection
Sub HandleEvent(Of TEvent As IEvent(Of TAggregate))(ByVal eventToHandle As TEvent)
End Interface
Each specific projection that implements the class decides what action to perform with the event's data payload:
Public Overrides Sub HandleEvent(Of TEvent As IEvent(Of MockAggregate))_
(eventToHandle As TEvent) _
Implements IProjection(Of MockAggregate, String).HandleEvent
Select Case eventToHandle.GetType()
Case GetType(MockEventTypeOne)
HandleMockEventOne(CTypeDynamic(Of MockEventTypeOne)(eventToHandle))
Case GetType(MockEventTypeTwo)
HandleMockEventTwo(CTypeDynamic(Of MockEventTypeTwo)(eventToHandle))
Case Else
Throw New ArgumentException("Unexpected event type - " & _
eventToHandle.GetType().Name)
End Select
End Sub
Private Sub HandleMockEventOne(ByVal eventToHandle As MockEventTypeOne)
AddOrUpdateValue(Of Integer)(NameOf(Total), _
ProjectionSnapshotProperty.NO_ROW_NUMBER, Total + _
eventToHandle.EventOneIntegerProperty)
AddOrUpdateValue(Of String)(NameOf(LastString), _
ProjectionSnapshotProperty.NO_ROW_NUMBER, eventToHandle.EventOneStringProperty)
End Sub
Snapshots
To allow the current state of a projection to be saved - both for use by any readers and for allowing us to have a starting point if we have to rebuild a projection after a service interruption - an interface to define a snapshot of a projection is also defined. This is a way of saying "it was like this at a given known point".
Public Interface ISnaphot(Of In IAggregateIdentity)
ReadOnly Property AsOfVersion As Long
End Interface
In practice, I save these snapshots as a blob (file) in JSON format - this makes (some of) the query side of the CQRS architecture as simple as finding the snapshot and reading it.
Sequencing and Synchronizing Events
It is often useful to be able to sequence events that occurred to different aggregations together - for example, you might have one aggregation for a stock price and another for an account and need to combine the two to give an account valuation at a given point in time.
To facilitate this, a master synchronization field is needed - this could be an incremental number or you can use the date/time of the event occurrence.
To do this, an abstract
class is used as the base class of all the event types and it handles the synchronization key.
Public MustInherit Class EventBase
Public Property SynchronisationStamp As Long
Public Sub New()
SynchronisationStamp = DateTime.UtcNow.Ticks
End Sub
End Class
Running a Projection
Now we want to run a projection over the event stream in order to read the current state of the aggregate from the events that have occurred to it.
Because a projection is a business object rather than a framework object, I want to separate it from the underlying implementation of the event stream itself. (This is also very handy to allow you to perform unit tests on a projection by using an in-memory event stream created by the unit test method itself).
In order to do this, we have an interface IEventStreamReader
which provides the event stream to the projection. Each implementation of a different backing store to an event stream must support this interface:
Public Interface IEventStreamReader(Of TAggregate As IAggregationIdentifier,
TAggregationKey)
Function GetEvents() As IEnumerable(Of IEvent(Of TAggregate))
Function GetEvents(ByVal StartingVersion As UInteger) _
As IEnumerable(Of IEvent(Of TAggregate))
Function GetEventsWithContext() As IEnumerable(Of IEventContext)
End Interface
The particular event stream reader to use is passed to a "Projection Processor" which in turn can take a projection class and run that over the event stream:
Public Class ProjectionProcessor(Of TAggregate As IAggregationIdentifier,
TAggregateKey)
Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate,
TAggregateKey)
Public Sub Process(ByVal projectionToProcess As IProjection(Of TAggregate,
TAggregateKey))
If (m_streamReader IsNot Nothing) Then
If (projectionToProcess IsNot Nothing) Then
Dim startingSequence As UInteger = 0
If (projectionToProcess.SupportsSnapshots) Then
End If
For Each evt In m_streamReader.GetEvents(startingSequence)
If (projectionToProcess.HandlesEventType(evt.GetType())) Then
projectionToProcess.HandleEvent(evt)
End If
Next
End If
Else
End If
End Sub
Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey))
m_streamReader = readerTouse
End Sub
End Class
The specific event stream reader implementation class has a factory method to create a projection processor for any given aggregate instance thus:
#Region "Factory methods"
Public Shared Function Create_
(ByVal instance As IAggregationIdentifier(Of TAggregationKey))
As IEventStreamReader(Of TAggregate, TAggregationKey)
Return New BlobEventStreamReader(Of TAggregate, TAggregationKey)
(DomainNameAttribute.GetDomainName(instance), instance.GetKey())
End Function
Public Shared Function CreateProjectionProces(
ByVal instance As IAggregationIdentifier(Of TAggregationKey))
As ProjectionProcessor(Of TAggregate, TAggregationKey)
Return New ProjectionProcessor(Of TAggregate, TAggregationKey )
(Create(instance))
End Function
#End Region
Having created this framework code, we can now create our aggregate, event and projection (business logic) classes totally independent of the underlying implementation of their event stream and allowing them to be portable between these different technologies.
Running a Classifier
A classifier is a special kind of projection that is used to decide whether or not a given instance of an aggregate is in or out of some defined business meaningful group. For example, if you were implementing a banking system, you might have a classifier which ran over the event stream of each account so as to decide which accounts were in the group "accounts in arrears".
Designing Your Business Code
Once you have a framework to use as the basis for an event stream based system, you have to create the classes that represent the business parts of that system: the aggregates, events and projections. You can do this with a graphical designer or if you prefer, you can just create the classes in code yourself.
Starting with the aggregate identifier (the "thing" to which events can occur and be recorded), you need to create a class that defines how that aggregate can be uniquely identified. For this, we need to identify the data type for its unique key and, because so many systems use strings for their data storage, a consistent way to turn that unique key into a string.
If we take the example of a bank account that is identified by an unique account number, we would end up with an aggregate class like:
<DomainName("CloudBank")>
Public Class Account
Implements IAggregationIdentifier(Of String)
Private m_bankAccount As String
Public Sub SetKey(key As String) _
Implements IAggregationIdentifier(Of String).SetKey
m_bankAccount = key
End Sub
Public Function GetAggregateIdentifier() As String _
Implements IAggregationIdentifier.GetAggregateIdentifier
Return m_bankAccount
End Function
Public Function GetBankAccountNumber() As String _
Implements IAggregationIdentifier(Of String).GetKey
Return m_bankAccount
End Function
Public Sub New(ByVal accountNumber As String)
m_bankAccount = accountNumber
End Sub
End Class
Then for each event that can occur against this bank account, you would need to create an event class that has all of the properties that can be known about the event. Note that there is no need to define any key event or the mandatory nature - basically, any data that can be stored about an event should go into the event definition.
So the event that corresponds to money being deposited into a bank account could look something like:
<DomainName("CloudBank")>
<AggregateIdentifier(GetType(Account))>
<EventAsOfDate(NameOf(MoneyDeposited.DepositDate))>
Public Class MoneyDeposited
Implements IEvent(Of Account)
Public Property DepositDate As Nullable(Of DateTime)
Public Property Amount As Decimal
Public ReadOnly Property Version As UInteger Implements IEvent(Of Account).Version
Get
Return 1
End Get
End Property
End Class
Notice that the event definition also includes a version number, which you should increment if you ever change or add properties. There is also an optional attribute EventAsOfDate
which allows you to specify which property of the event contains the actual real world date and time that the event occurred.
Having defined all of the event types, you would then move to defining the projections that allow you to get the point-in-time state of the aggregate from its event stream. A projection class needs to know which event types it handles, and for each of these what to do when that event type is encountered. For example, a projection that was to give you the running balance of an account would need to handle MoneyDeposited
and MoneyWithdrawn
events and to update the CurrentBalance
property when it encounters them:
<DomainName("CloudBank")>
<AggregateIdentifier(GetType(Account))>
Public Class CurrentBalanceProjection
Inherits ProjectionBase(Of Account, String)
Implements IHandleEvent(Of MoneyDeposited)
Implements IHandleEvent(Of MoneyWithdrawn)
Private m_currentBalance As Decimal
Public ReadOnly Property CurrentBalance As Decimal
Get
Return m_currentBalance
End Get
End Property
Public Overrides Function HandlesEventType(eventType As Type) As Boolean
If eventType Is GetType(MoneyDeposited) Then
Return True
End If
If eventType Is GetType(MoneyDeposited) Then
Return True
End If
Return False
End Function
Public Overrides ReadOnly Property SupportsSnapshots As Boolean
Get
Return True
End Get
End Property
Public Overrides Sub HandleEvent(Of TEvent As IEvent)(eventToHandle As TEvent)
If GetType(TEvent) Is GetType(MoneyDeposited) Then
HandleMoneyDepositedEvent(CTypeDynamic(Of MoneyDeposited)(eventToHandle))
End If
If GetType(TEvent) Is GetType(MoneyWithdrawn) Then
HandleMoneyWithdrawnEvent(CTypeDynamic(Of MoneyWithdrawn)(eventToHandle))
End If
End Sub
Public Sub HandleMoneyWithdrawnEvent(eventHandled As MoneyWithdrawn) _
Implements IHandleEvent(Of MoneyWithdrawn).HandleEvent
m_currentBalance -= eventHandled.Amount
End Sub
Public Shadows Sub HandleMoneyDepositedEvent(eventHandled As MoneyDeposited) _
Implements IHandleEvent(Of MoneyDeposited).HandleEvent
m_currentBalance += eventHandled.Amount
End Sub
End Class
As you can see in this example, there is a clear and complete separation between the business logic code and the framework code that supports it, allowing for mock-free testing of all the business rule classes.
Notes on the Source Code
The source code I have attached to this project contains the framework code (and unit test projects) for all the "CQRS on Azure" articles I have uploaded:
Each project has a read-me file and I recommend reading this and also running through the unit tests to improve the use you can get from this code.
I have deleted the App.Config
from the unit test as it contains a reference to my own Azure storage account. You will need an Azure account in order to run the on-cloud unit tests, but the in memory tests and local file tests can be run without any such access.
<connectionStrings>
<add name="UnitTestStorageConnectionString"
connectionString="TODO - Add your connection string here" providerName="" />
<add name="StorageConnectionString"
connectionString="TODO - Add your connection string here" providerName="" />
</connectionStrings>
Further Reading / Resources
If you are planning on implementing an Event Sourcing based system, especially if it is as part of a microservices architecture, I would recommend the "blue book" -
I would also recommend the blog posts by Greg Young that deal with event sourcing in some detail, and the various presentations he has done on the subject which can be found on YouTube.