As per the previous blog post, there are multiple different choices of backing technology for an event stream stored on Azure Storage - in particular, I compared the use of Table vs File vs Blob (actually AppendBlob
as an event stream is an append only data structure so this is the most sensible blob type to use).
Now we want to run a projection over the event stream in order to read the current state of the aggregate from the events that have occurred to it.
IEventStreamReader Interface
Because a projection is a business object rather than a framework object, I want to separate it from the underlying implementation of the event stream itself. (This is also very handy to allow you to perform unit tests on a projection by using an in-memory event stream created by the unit test method itself).
In order to do this, we have an interface IEventStreamReader
which provides the event stream to the projection. Each implementation of a different backing store to an event stream must support this interface.
Public Interface IEventStreamReader(Of TAggregate As IAggregationIdentifier,
TAggregationKey)
Function GetEvents() As IEnumerable(Of IEvent(Of TAggregate))
Function GetEvents(ByVal StartingVersion As UInteger) As IEnumerable(Of IEvent(Of TAggregate))
Function GetEventsWithContext() As IEnumerable(Of IEventContext)
End Interface
The particular event stream reader to use is passed to a "Projection Processor" which in turn can take a projection class and run that over the event stream:
Public Class ProjectionProcessor(Of TAggregate As IAggregationIdentifier,
TAggregateKey)
Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate,
TAggregateKey)
Public Sub Process(ByVal projectionToProcess As IProjection(Of TAggregate,
TAggregateKey))
If (m_streamReader IsNot Nothing) Then
If (projectionToProcess IsNot Nothing) Then
Dim startingSequence As UInteger = 0
If (projectionToProcess.SupportsSnapshots) Then
End If
For Each evt In m_streamReader.GetEvents(startingSequence)
If (projectionToProcess.HandlesEventType(evt.GetType())) Then
projectionToProcess.HandleEvent(evt)
End If
Next
End If
Else
End If
End Sub
Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey))
m_streamReader = readerTouse
End Sub
End Class
The specific event stream reader implementation class has a factory method to create a projection processor for any given aggregate instance thus:
#Region "Factory methods"
Public Shared Function Create(ByVal instance As IAggregationIdentifier(Of TAggregationKey))
As IEventStreamReader(Of TAggregate, TAggregationKey)
Return New BlobEventStreamReader(Of TAggregate, TAggregationKey)
(DomainNameAttribute.GetDomainName(instance), instance.GetKey())
End Function
Public Shared Function CreateProjectionProcessor(
ByVal instance As IAggregationIdentifier(Of TAggregationKey))
As ProjectionProcessor(Of TAggregate, TAggregationKey)
Return New ProjectionProcessor(Of TAggregate, TAggregationKey)
(Create(instance))
End Function
#End Region
Having created this framework code, we can now create our aggregate, event and projection (business logic) classes totally independent of the underlying implementation of their event stream and allowing them to be portable between these different technologies.
CodeProject