Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / VB

Event Streams and Identity Groups - Using Classifiers

4.50/5 (2 votes)
16 Aug 2016CPOL5 min read 6.6K  
How to extend the event streaming architecture using the concept of classifiers and instance groups to add business meaningful groups

Background

In a CQRS system (command-query responsibility separation) that is based on top of event sourcing, we have the need to run a query for a given set of unique aggregate identifiers.

For example, if we have a banking system, our aggregate might be "bank account" which is uniquely identified by an account number.

We can create projections that run over one account number and return a value by interacting with the events in the event stream for that aggregate and by performing operations on the data in those events can arrive at a current state for the aggregate identifier.

In the bank account example, events like [Interest Paid] and [Deposit Paid In] would increment the running balance of our running balance projection, and events like [Withdrawal] and [Charge Paid] would reduce that balance.

However, when running queries against a system, we often want to restrict the set of aggregate unique identifiers over which the projection is run (both from a performance point of view but also from a business need point of view, we don't always want to query every single aggregate).

In our bank account example, we might want a group of account numbers that is identified by the group named "Held by US citizens" if we were going to perform some form of statutory tax reporting.

Identifier Groups

Identifier groups are a way of creating (and maintaining) a list of unique identifiers that share a common attribute or business meaning. These groups are maintained separate to the aggregate identifier themselves - i.e., it is not up to the aggregate to hold information as to whether it is considered in or out of any given group but rather the group itself must test the aggregate identifiers and count them in or out.

This maintenance of the identity group is itself a projection over the event stream. This, in turn, means that it is possible to regenerate the group as at a specific point in time as well as evaluating membership of the group now. For our hypothetical business need above, it might be that tax reporting is required for accounts held by US citizens as at the start of the tax year only - which means we need to run our group projection up to that date to generate the set of accounts over which we will run our taxation reporting projection.

Classifiers

A classifier is a special form of a projection which runs over the event stream of an aggregate and decides if it is "in" or "out" of the identity group as at any given point in time.

In the above example of the "accounts held by US citizens" identity group, the classifier might run over events occurring to the aggregate identifier "account" and decide if the account is in or out of the group based on "account opened", "beneficial ownership changed" and any other similar events.

Since a classifier needs access to an event stream which may be provided by any one of a number of possible backing technologies (database, file, NoSQL tables, Azure Blob, etc.) the actual interaction with the event stream is performed using an interface: IEventStreamReader(Of TAggregate, TAggregateKey) which each of these backing technologies can provide.

A processor class then provides an interface between this implementation detail, and the purely business centric classifier class which decides whether a given instance is to be considered in or out of the identity group.

The classifier reads each event in sequence from the event stream, deciding first if the classifier needs to evaluate that event type and - if it does - what impact that event has on the group membership.

VB.NET
Public Function Classify(ByVal classifierToProcess As IClassifier(Of TAggregate, TAggregateKey)) _
As IClassifierEventHandler.EvaluationResult

    If m_streamReader IsNot Nothing Then
        If (classifierToProcess IsNot Nothing) Then
            'Does it support snapshots?
            Dim startingSequence As UInteger = 0
            If (classifierToProcess.SupportsSnapshots) Then
                'load the most recent snapshot for it
                'and update the starting sequence
            End If
            Dim retVal As IClassifierEventHandler.EvaluationResult = _
            IClassifierEventHandler.EvaluationResult.Unchanged
            For Each evt In m_streamReader.GetEvents(startingSequence)
                If (classifierToProcess.HandlesEventType(evt.GetType())) Then
                    retVal = classifierToProcess.Evaluate(evt)
                End If
            Next
            ' Return the evaluation status as at the end of the event stream
            Return retVal
        End If
    End If

    'If no classification was performed, leave the result as unchanged..
    Return IClassifierEventHandler.EvaluationResult.Unchanged

End Function

Since an event stream is immutable, it follows that for any given classifier, the results of that classification as at a given point in the event stream is also immutable. Therefore, I have an additional snapshot functionality which allows the classification process to pick up the event stream from a given point and only evaluate the events from that point onwards.

Nested Identifier Groups

The identifier groups used by the business to describe their system can be nested one within the other. For example, in a banking context, the identity group "accounts in arrears" would be wholly nested within the group "active accounts" which in turn would be wholly nested within the outer group of "all accounts".

Evaluating any given group's membership as at any given point of time means finding the members of the outer group as at that point in time and then running the specific classifier function over that set of identifiers.

At first glance, this seems like it would be a really slow way of querying a system. However, because event streams are immutable, membership of any given group as at any given point in time is also immutable and can therefore be stored in a snapshot.

This means that the first query that needs group membership as at a given point in time will take the hit in having to calculate that group membership but any subsequent query can run straight from the cache. You can even pre-generate the cache for given business-meaningful points in time (such as the close of business for each trading day) so that the cache is available before it is actively sought.

This is all part of the architectural changes that are emerging as computing power (and storage) gets less expensive and human time (developers and users) gets ever more expensive. It means that we get the computer to do as much of the work as possible even if we don't know that this work will ever be needed.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)