Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Hosted-services / Azure

CQRS on Windows Azure - Identity Groups and Classifiers

4.89/5 (2 votes)
23 Aug 2016CPOL4 min read 10.6K  
One way to apply set-theory (relational) operations to event stream based data structures

Introduction

One of the valid criticisms of using event sourcing (also known as event streams) as the data storage mechanism for a business system is that it is more difficult to run business queries over since you do not know the properties of any given entity (or aggregate) until you run the relevant projection over the event stream.

Identity groups and classifiers are one way of addressing this.

Background

If you have not used event streams / event sourcing before, I would really recommend reading the section "Getting your head around event sourcing" from the article: CQRS on Windows Azure - Event Sourcing.

Identity Groups

An identity group is a collection of entities of the same type which share some common attribute or property by which they are grouped into a business-meaningful group.

For example, if you have a banking application with an entity type of "Bank Account", it would be meaningful to create a business meaningful group "Accounts Overdrawn" being the subset of all the accounts which currently have a balance amount below zero. In turn, you might have a business group "Delinquent Accounts" being those members of the "Accounts Overdrawn" group that have not been in credit in over 30 days.

Therefore, an identity group can be identified in code by its name, its parent group name (if it is not a subset of the group "all") :

VB.NET
    Public Interface IIdentifierGroup

    ''' <summary>
    ''' The unique name of the identity group
    ''' </summary>
    ''' <remarks>
    ''' This name can be passed as a parameter for a query definition.  
    ''' There are two predefined names:-
    ''' "Identity" being the group of one specified aggregate identifier and 
    ''' "All" being the group of all 
    ''' instances of an aggregate identifier type.
    ''' </remarks>
    ReadOnly Property Name As String

    ''' <summary>
    ''' The name of the outer parent group of which all members 
    ''' must be members of to be checked 
    ''' for membership of this group
    ''' </summary>
    ''' <remarks>
    ''' This can be used to speed up evaluation of 
    ''' group membership by starting from a smaller 
    ''' initial group than "All"
    ''' If not set then "All" is assumed
    ''' </remarks>
    ReadOnly Property ParentGroupName As String

End Interface

And for type-safety, an identity group can be typed to only allow one entity type and to be specific to the unique identifier used to identify instances of the entity. In our bank accounts example, this would be a "Bank Accounts" class uniquely identified by a unique "Account Number" (string).

VB.NET
    ''' <remarks>
    ''' The group is uniquely named per aggregate identifier type, 
    ''' and is populated by its own projection which decides
    ''' if any given aggregate identifier is in or out of the group
    ''' </remarks>
Public Interface IIdentifierGroup(Of TAggregateIdentifier _
                 As IAggregationIdentifier, TAggregateKey)
    Inherits IIdentifierGroup

End Interface

Classifier

A classifier is a class that, when run over the event stream of a single entity can classify whether it is in the identity group or not based on some function performed for some or all of the events in the event stream.

A classifier for a bank account entity that has to classify if the account is in the "Accounts Overdrawn" category would need to handle any events that affected the balance (deposit, withdrawal, charge, tax, interest, etc.) and keep a running total of the balance. If the classifier reached the end of the stream and that running total is below zero, then the classifier deems that account to be inside the identity group, otherwise it is outside.

A classifier is therefore a very specialized form of projection and can use much of the same underlying functionality.

VB.NET
    Public Interface IClassifier

    ''' <summary>
    ''' Does the projection handle the data for the given event type
    ''' </summary>
    ''' <param name="eventType">
    ''' The type of the event containing the data that may or may not be handled
    ''' </param>
    ''' <returns>
    ''' True if this event type should get processed
    ''' </returns>
    Function HandlesEventType(ByVal eventType As Type) As Boolean

End Interface

Public Interface IClassifier(Of TAggregate As IAggregationIdentifier, TAggregateKey)
    Inherits IClassifier

    ''' <summary>
    ''' Perform whatever evaluation is required to handle the specific event
    ''' </summary>
    ''' <param name="eventToHandle">
    ''' The specific event to handle and perform whatever processing is required in order to 
    ''' evaluate the status of the aggregate instance in relation to the identity group
    ''' </param>
    Function Evaluate(Of TEvent As IEvent(Of TAggregate))_
       (ByVal eventToHandle As TEvent) As IClassifierEventHandler.EvaluationResult

End Interface

Alternatively, you can perform a classification by first running a projection and then applying some logic to the outcome of that projection. For example, a classifier for "accounts in credit" would first run the "current balance" projection and then run a classification based on whether the balance was greater than zero:-

C#
/// <summary>
/// Use the running balance of the account to decide if it is in credit
/// </summary>
public IClassifierDataSourceHandler.EvaluationResult EvaluateProjection
                                          (IRunning_Balance projection)
{
   if (projection.Balance > 0)
    {
        return IClassifierDataSourceHandler.EvaluationResult.Include;
    }
    return IClassifierDataSourceHandler.EvaluationResult.Exclude;
}

Processors

The identity group and the classifier are business related classes which do not have any concrete implementation logic for running them over actual event streams. For that, we need special processor classes that can perform the classification and grouping functionality:

VB.NET
''' <summary>
''' Class to run defined classifiers over an event stream to classify 
''' an aggregate instance as being 
''' inside or outside of the identity group the classifier pertains to
''' </summary>
''' <typeparam name="TAggregate">
''' The class of the aggregate of which this is an instance 
''' </typeparam>
''' <typeparam name="TAggregateKey">
''' The data type of the key that uniquely identifies an instance of this aggregate
''' </typeparam>
Public NotInheritable Class ClassifierProcessor_
   (Of TAggregate As IAggregationIdentifier, TAggregateKey, TClassifier As IClassifier)
    Implements IClassifierProcessor(Of TAggregate, TAggregateKey, TClassifier)

    ' The stream reader instance that will be used to run the projections
    Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate, TAggregateKey)
    Private ReadOnly m_classifier As IClassifier(Of TAggregate, TAggregateKey)

    Public Function Classify(Optional ByVal classifierToProcess As IClassifier_
      (Of TAggregate, TAggregateKey) = Nothing) _
      As IClassifierEventHandler.EvaluationResult Implements IClassifierProcessor_
      (Of TAggregate, TAggregateKey, TClassifier).Classify

        If (classifierToProcess Is Nothing) Then
            If (m_classifier IsNot Nothing) Then
                classifierToProcess = m_classifier
            End If
        End If

        If m_streamReader IsNot Nothing Then
            If (classifierToProcess IsNot Nothing) Then
                Dim startingSequence As UInteger = 0

                Dim retVal As IClassifierEventHandler.EvaluationResult = _
                         IClassifierEventHandler.EvaluationResult.Unchanged
                For Each evt In m_streamReader.GetEvents(startingSequence)
                    If (classifierToProcess.HandlesEventType(evt.GetType())) Then
                        retVal = classifierToProcess.Evaluate(evt)
                    End If
                Next

                ' Return the evaluation status as at the end of the event stream
                Return retVal
            End If
        End If

        'If no classification was performed, leave the result as unchanged..
        Return IClassifierEventHandler.EvaluationResult.Unchanged
    End Function

    ''' <summary>
    ''' Create a new classifier processor that will use the given event stream reader 
    ''' to do its processing
    ''' </summary>
    ''' <param name="readerTouse">
    ''' The event stream processor to use
    ''' </param>
    ''' <param name="classifier">
    ''' (Optional) The classifier class that does the actual evaluation
    ''' </param>
    Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey),
                   Optional classifier As IClassifier(Of TAggregate, TAggregateKey) = Nothing)
        m_streamReader = readerTouse
        If (classifier IsNot Nothing) Then
            m_classifier = classifier
        End If
    End Sub

Snapshots

Given that every event stream is immutable, it therefore follows that if a classifier run over it is deterministic then it is possible to take snapshots of the classifier state as at a given point in the event stream which can then be used as a starting point for the classifier from that point onwards.

Equally, any identity group that is based off these deterministic classifiers is also snapshot-able. In practice, I have added a property to the classifier itself that the developer can use to indicate whether a classifier can be written to a snapshot or whether the entire event stream must be read every time.

Parallelism

Because classifiers run on one event stream and cannot have any interaction with each other, the classification process is inherently parallel. This allows for a scale-across architecture of simply adding more classifier processor machines if the load on the identity groups exceeds that which can be handled at present.

Points of Interest

An alternative (and far more usual) way to deal with the issue of running business queries over event streams is to persist the projections to a read-only database over which business queries can be run.

History

  • 23rd August, 2016: Initial design
  • 28th December 2016: Added projection evaluation as an alternative classification method

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)