Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / All-Topics

Event Streams on Azure - Running a Projection

5.00/5 (1 vote)
10 Feb 2016CPOL1 min read 5.4K  
Event streams on Azure - running a projection

This article is an entry in our Microsoft Azure IoT Contest. Articles in this section are not required to be full articles so care should be taken when voting.

As per the previous blog post, there are multiple different choices of backing technology for an event stream stored on Azure Storage - in particular, I compared the use of Table vs File vs Blob (actually AppendBlob as an event stream is an append only data structure so this is the most sensible blob type to use).

Now we want to run a projection over the event stream in order to read the current state of the aggregate from the events that have occurred to it.

IEventStreamReader Interface

Because a projection is a business object rather than a framework object, I want to separate it from the underlying implementation of the event stream itself. (This is also very handy to allow you to perform unit tests on a projection by using an in-memory event stream created by the unit test method itself).

In order to do this, we have an interface IEventStreamReader which provides the event stream to the projection. Each implementation of a different backing store to an event stream must support this interface.

VB.NET
''' <summary>
''' Definition for any implementation that can read events from an event stream
''' </summary>
''' <typeparam name="TAggregate">
''' The data type of the aggregate that identifies the event stream to read
''' </typeparam>
''' <typeparam name="TAggregationKey">
''' The data type of the key that uniquely identifies the specific event 
''' stream instance to read
''' </typeparam>
Public Interface IEventStreamReader(Of TAggregate As IAggregationIdentifier, 
                                       TAggregationKey)

    ''' <summary>
    ''' Get the event stream for a given aggregate
    ''' </summary>
    Function GetEvents() As IEnumerable(Of IEvent(Of TAggregate))

    ''' <summary>
    ''' Gets the event stream for a given aggregate from a given starting version
    ''' </summary>
    ''' <param name="StartingVersion">
    ''' The starting version number for our snapshot
    ''' </param>
    ''' <remarks>
    ''' This is used in scenario where we are starting from 
    ''' a given snapshot version
    ''' </remarks>
    Function GetEvents(ByVal StartingVersion As UInteger) As IEnumerable(Of IEvent(Of TAggregate))

    ''' <summary>
    ''' Gets the event stream and the context information recorded for each event
    ''' </summary>
    ''' <remarks>
    ''' This is typically only used for audit trails as all business functionality 
    ''' should depend on the event data alone
    ''' </remarks>
    Function GetEventsWithContext() As IEnumerable(Of IEventContext)

End Interface

The particular event stream reader to use is passed to a "Projection Processor" which in turn can take a projection class and run that over the event stream:

VB.NET
Public Class ProjectionProcessor(Of TAggregate As IAggregationIdentifier,
                                       TAggregateKey)

    ''' <summary>
    ''' The stream reader instance that will be used to run the projections
    ''' </summary>
    Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate, 
                                        TAggregateKey)

    ''' <summary>
    ''' Process the given projection using the event stream reader 
    ''' we have set up
    ''' </summary>
    ''' <param name="projectionToProcess">
    ''' The class that defines the projection operation we are going to process
    ''' </param>
    Public Sub Process(ByVal projectionToProcess As IProjection(Of TAggregate, 
                                                 TAggregateKey))

        If (m_streamReader IsNot Nothing) Then
            If (projectionToProcess IsNot Nothing) Then
                'Does it support snapshots?
                Dim startingSequence As UInteger = 0
                If (projectionToProcess.SupportsSnapshots) Then
                    'load the most recent snapshot for it

                    'and update the starting sequence
                End If
                For Each evt In m_streamReader.GetEvents(startingSequence)
                    If (projectionToProcess.HandlesEventType(evt.GetType())) Then
                        projectionToProcess.HandleEvent(evt)
                    End If
                Next
            End If
        Else
            'Unable to use this projection as it has no stream reader associated
        End If

    End Sub

    ''' <summary>
    ''' Create a new projection processor that will use the given event 
    ''' stream reader to do its processing
    ''' </summary>
    ''' <param name="readerTouse">
    ''' The event stream processor to use
    ''' </param>
    Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey))
        m_streamReader = readerTouse
    End Sub

End Class

The specific event stream reader implementation class has a factory method to create a projection processor for any given aggregate instance thus:

VB.NET
#Region "Factory methods"
         ''' <summary>
        ''' Creates an azure blob storage based event stream reader for the 
        ''' given aggregate
        ''' </summary>
        ''' <param name="instance">
        ''' The instance of the aggregate for which we want to read
        ''' the event stream
        ''' </param>
        ''' <returns>
        ''' </returns>
        Public Shared Function Create(ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
              As IEventStreamReader(Of TAggregate, TAggregationKey)

            Return New BlobEventStreamReader(Of TAggregate, TAggregationKey)
                (DomainNameAttribute.GetDomainName(instance), instance.GetKey())

        End Function

        ''' <summary>
        ''' Create a projection processor that works off an azure 
        ''' blob backed event stream
        ''' </summary>
        ''' <param name="instance">
        ''' The instance of the aggregate for which we want to run projections
        ''' </param>
        ''' <returns>
        ''' A projection processor that can run projections over this event stream
        ''' </returns>
        Public Shared Function CreateProjectionProcessor(
              ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
                  As ProjectionProcessor(Of TAggregate, TAggregationKey)

            Return New ProjectionProcessor(Of TAggregate, TAggregationKey)
                 (Create(instance))

        End Function

#End Region

Having created this framework code, we can now create our aggregate, event and projection (business logic) classes totally independent of the underlying implementation of their event stream and allowing them to be portable between these different technologies.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)