Introduction
Serverless is a hot topic in cloud computing these days as it offers the possibility of a purely demand based charging structure for your cloud hosted systems with no fixed costs at all. This would allow your business to scale up or down with demand which, in turn, means a leaner and more business focused IT budget.
Of course, no system is actually serverless. In Azure Event Grid as in most serverless cloud solutions, the difference as compared to a virtual machine based architecture is that the cloud provider is responsible for the hosting, spinning up and scaling aspects of your application. The cost savings of this shared infrastructure model is also passed on making this the cheapest way to build large scale systems.
Background
Event Grid
Azure Event Grid is a set of connected technologies provided by Microsoft as part of their Azure cloud which are designed to facilitate an event-driven architecture.
It consists of event triggers that can be set up to either fire on a state change event (such as a new file being loaded into an Azure blob or a new row being added to an Azure table) or on a time trigger or indeed received from an external system (such as an IoT sensor, etc.).
These triggers are linked by the Event Grid to route the events from their trigger to a destination which can be an Azure function, a logic app, Azure automation or even a webhooks event.
CQRS and Event Sourcing
CQRS is an application design pattern (or architecture) that differs from single model architectures (like MVC or MVVM) in that the parts of the system responsible for changing the state of the system (the Command side) is entirely separate to the parts of the system responsible for getting the state of the system (the Query side).
Event Sourcing is a data architecture that flips the way we store data on its head. Instead of storing the state of any given object as it currently is and updating that state, we instead store the complete history of every change that has occurred to the object and allow us to derive the state of the object by running code known as a projection over that event history.
Example System: A Farm Management Application
The more traditional example systems for CQRS tend to be based on very transactional businesses, such as financial services, retail banking or insurance. However, to show that this architecture can be used outside these narrow confines, I will be using an imaginary mixed usage farm management system.
Analysis Stage
When performing the earliest stage of the design of an event based system, the best approach is to start with an analysis tool known as Event Storming, which is a collaborative domain discovery exercise wherein a small group of domain experts and one or more facilitators use post-it notes and a very long design space to map out all the events of interest to the domain, what the event pertains to and what triggers the event. It is usual to go through a number of event storming sessions to discover the full scope of a domain before doing any detailed design at all.
It is also a good idea to use the event storming sessions (and the experts consulted in them) to define the domain boundary or boundaries for your system.
This allows you to set up a shearing layer (effectively an interface) between the parts of your system which you do have control over and those externalities such as government, regulators, data providers, etc. which you do not have control over. By doing this, you allow your system to evolve in a controlled environment which will save you from a good deal of effort.
Aggregate and Event Design
Using the CQRS designer, the next step is to set out the aggregates in the domains and the events identified that can occur to these aggregates. For each event, we add in as many properties as we can capture pertaining to that event (even if we do not know if these properties will be useful yet or not, it is better to capture data not needed than to need data not captured.)
In our farm management system, the aggregates chosen for which we design this event universe are cow
, field
, barn
and tractor
. For each aggregate type, we need to decide on a unique identifier which we will use to identify an individual instance of that aggregate type.
In some cases, we can reuse an existing unique identifier - for example, cattle would have a unique combination of herd number and ear tag number, and each tractor would have a unique chassis number. In other cases, we can decide our own unique identifier such as an incremental number or a GUID. As long as the identifier is guaranteed unique and the "lookup" step is not too onerous, it doesn't really matter that much.
The next step is to define a command for each state changing cause that enters the system. These may pertain to an individual aggregate but they might also pertain to multiple instances as a known group or indeed a single external command may cause events to occur for more than one aggregate type. For example, an AI visit would apply to one cow, a TB inspection would be for a known group of cattle and a financial loan collateralisation might cover tractors, fields and even cattle together.
You can then decompose each event in your system to a combination of "which aggregates do I impact" and then "what do I do to those aggregates". From this, we select the event streams for the aggregates to be impacted and append the event(s) created to the end of their event stream.
To get data out of the system, we create projections. These are simple bits of code that run over the event stream and for each event in turn decide "do I care about this event" and, if yes, "what internal property update(s) do I perform from this event". It is important to remember that a projection is part of the query side of CQRS and so should not itself cause any state changes.
Having designed the projections, you can combine them into queries that define which aggregates to run the projection for and then what function to apply to the returned data (in a sort of map-reduce manner). Queries can be combined together by composition where report-like output is required.
You can also have internally triggered projections which update data in a read cache and thereby decide if aggregate instances are in or out of business designed groupings. I call these classifiers to differentiate their role in the system but they are functionally identical to other projections except in that their outcome is a single in/out value.
Implementation on Azure Event Grid
1) Commands
The first step is to create a new Azure resource group for your domain. You can either do this using the Azure portal or you can use the Azure command line interface:
or:
az group create --name eventFarmyardDemo --location westus2
For each command, we need to decide how the command gets into our system. The range of data inputs on even a small farm are extensive - forms and receipts submitted for each transaction, manual data entry, IoT sensors or automated data capture systems and so on. For each of these, we need to select the best event publisher to trigger the commands in our domain from the list of blob storage (effectively a cloud based file system), event hubs or via custom topics (listening out for other events).
For sensor data and automated data capture, it is usual to use event hubs but in most other cases, I would use blob storage with new commands being implemented as a file being written to a specific "inbox" command location containing the data required to perform the operations for that command and subscribing to the Microsoft.Storage.BlobCreated
event.
In either case, we want the action that triggers a command to push a command event onto the event grid. In order to use our domain knowledge rather than implicit commands (new file added to [x]
), we would want to define custom subscriptions for the command. This also creates a shearing layer between what triggers the command and the command itself in our domain, allowing the trigger to change at a later date (for example, if a manual data entry process becomes automated).
The idea here is that if you change the way that data gets into the system, you don't need to do anything except redirect the new trigger to the existing business command.
2) Command Handlers
A command handler has to respond when a particular command is triggered and to perform whatever actions that command entails up to writing events to the affected aggregates' event streams.
A good way to implement a command handler is as an Azure logic app:
You could use the Event Grid custom topic as the trigger for your command handler:
The command handler has to validate if the command can be allowed to proceed and, if it is, it must then emit all the domain events that the command entails. These are then passed onwards (using another custom topic) to the event handlers.
Note that the topic name must be globally uinique as it is represented by a DNS entry so for a real world application, I would really recommend adding the domain name and perhaps an application name as part of every custom topic in that application.
Alternatively if you wish to react to the event grid topic from a compiled Azure function app you would need to use the EventGridTrigger atttribute and an EventGrid
parameter to receive the data from the event grid:
[FunctionName("OnCreateLeagueCommand")]
public static async void OnCreateLeagueCommand(
[EventGridTrigger] EventGridEvent eventGridEvent,
TraceWriter log)
{
}
You would then use the Azure portal to connect the function to the event grid topic (as a subscription).
Each command handler is broken into a sequence of steps, each backed by an independently executable Azure function which write their status to an event stream that is created for that command. This allows the command to be handled with some parrallel processing or to be resumed from a stalled state if an issue that was preventing the command from completing is resolved.
There is a "command status" projection that returns the current state of the command which each of these step handling functions can execute in order to test whether the command is in a state for which they can run.
public enum CommandState
{
Created = 0,
Validated = 1,
Invalid = 2,
InProgress = 3,
Completed =4
}
At the top level an Azure function reacts to the event grid topic and kicks off an Azure Durable Functions orchestrator function.
var jsondata = JsonConvert.SerializeObject(eventGridEvent.Data);
CommandRequest<Create_New_League_Definition> cmdRequest = null;
if (!string.IsNullOrWhiteSpace(jsondata))
{
cmdRequest = JsonConvert.DeserializeObject
<CommandRequest<Create_New_League_Definition>>(jsondata);
}
if (null != cmdRequest)
{
if (cmdRequest.CommandUniqueIdentifier == Guid.Empty)
{
cmdRequest.CommandUniqueIdentifier = Guid.NewGuid();
}
string instanceId =
await createLeagueCommandHandlerOrchestrationClient.StartNewAsync
("OnCreateLeagueCommandHandlerOrchestrator", cmdRequest);
log.LogInformation
($"Run OnCreateLeagueCommandHandlerOrchestrator orchestration
with ID = '{instanceId}'.");
}
This calls each step function as a Durable Function Activity:
[ApplicationName("The Long Run")]
[DomainName("Leagues")]
[AggregateRoot("League")]
[CommandName("Create League")]
[FunctionName("OnCreateLeagueCommandHandlerOrchestrator")]
public static async Task OnCreateLeagueCommandHandlerOrchestrator
([OrchestrationTrigger] DurableOrchestrationContext context,
Microsoft.Extensions.Logging.ILogger log)
{
CommandRequest<Create_New_League_Definition&g; cmdRequest =
context.GetInput<CommandRequest<Create_New_League_Definition>>();
if (null != cmdRequest)
{
ActivityResponse resp =
await context.CallActivityAsync<ActivityResponse>
("CreateLeagueCommandLogParametersActivity", cmdRequest);
#region Logging
if (null != log)
{
if (null != resp)
{
log.LogInformation($"{resp.FunctionName}
complete: {resp.Message } ");
}
}
#endregion
if (null != resp)
{
context.SetCustomStatus(resp);
}
Each activity does its step in the chain only, then returns. There is a standard response class ActivityResponse
that can be used to pass back information as to the health of the called activity. I prefer to do this rather than rely on using exceptions for business logic so that exception logs only contain actual infrastructure problems.
[ApplicationName("The Long Run")]
[DomainName("Leagues")]
[AggregateRoot("League")]
[CommandName("Create League")]
[FunctionName("CreateLeagueCommandValidationAction")]
public static async Task<bool> CreateLeagueCommandValidationAction
([ActivityTrigger] CommandRequest<Create_New_League_Definition> cmdRequest,
ILogger log)
{
if (null != log)
{
log.LogInformation($"CreateLeagueCommandValidationAction
called for command : {cmdRequest.CommandUniqueIdentifier}");
}
return await ValidateCreateLeagueCommand
(cmdRequest.CommandUniqueIdentifier.ToString(), log);
}
In addition, each step also writes its status to an event stream that underlies that unique command and which can be independently queried. This allows an application to display the process of the command in real time and also allows for a much better debugging experience should a business issue arise, as you can copy the event stream underlying the command execution to see what steps were performed and what the result was.
public static async Task LogCommandValidationError(Guid commandGuid,
string CommandName,
bool fatal,
string errorMessage)
{
EventStream commandEvents = new EventStream(@"Command",
CommandName,
commandGuid.ToString());
if (null != commandEvents)
{
await commandEvents.AppendEvent
(new TheLongRun.Common.Events.Command.ValidationErrorOccured
(errorMessage,fatal ));
}
}
Although there is an eventsream in a table that the durable functions framework creates itself to log execution, I again prefer to split the application such that the system events go in that built in table and business meaningful events go into their own separate event stream.
3) Persisting the Event Streams
Whenever an event has occurred, we need to persist it to the event stream of the aggregate instance that it has occurred to. This means that any projections run over this aggregate will take this event into consideration when running to derive the aggregate instance state.
Since all we are doing in this instance is finding the appropriate AppendBlob
for the aggregate instance and appending the event - in my case, using the AppendBlob
classes from the CQRS on Azure framework code.
After the event has been persisted, a notification event is raised to notify any interested parties (for example, any projections or classifiers that are dependent on that instance) that they in turn should read the latest event in and update their state(s) accordingly.
This is done (as with the command handler) by having each event trigger a custom event after it is persisted. Note that currently it is not possible (nor advisable) to use Binary Serialisation within any code called from an Azure serverless function so JSon is used.
The process of finding the correct event stream and appending an event to it is wrapped up in a class EventStream
:
EventStream tractorEvents = new EventStream(@"Cloud Farm",
"Tractor",
tractorName );
tractorEvents.AppendEvent(new ServicedEvent(DateTime.UTCNow, "Serviced" ));
4) Projections
A projection can be connected to each of the different event topics relating to the types of event that could cause a state change in that projection. These are each implemented as their own specific projection event handler for each event type. When the projection handles and processes an event, then if this causes its state to change then it will persist its current state to a cache storage and then raise a "projection changed" notification which other business processes and identifier group classification code can be triggered by.
This again will be done using a custom event topic, as with the event notification propagation events.
If the event is handled but the projection state is unchanged as a result, then no notification will need to be posted out.
A projection finds its underlying event stream by a class Projection
:
Projection getCommandState = new Projection(@"Command",
COMMAND_NAME,
commandGuid.ToString(),
nameof(Command_Summary_Projection));
The underlying logic of what to do with those events is held in the projection definition and they are fed into the projection by calling the Process
method.
Command_Summary_Projection cmdProjection =
new Command_Summary_Projection(log );
getCommandState.Process(cmdProjection);
After the projection has been processed its properties can be used in further processing or returned as part of a query.
5) Classifiers
A classifier is a special type of projection which can be used to decide if a particular instance is in or out of a given named set. For example, in the business case above, you might have a classifier to define which animals were on your farm based on events like bought, sold and transferred.
As with projections, when a classifier is notified to run and that run results in a change to the state of the classifier, then it will raise a notification message as a custom event grid topic.
6) Queries
For each defined query that can be run against the system, a specific custom topic is used to request the query to be performed. The payload passed in with the query specifies the parameters passed in to the system and also the notification method to perform to inform the caller when the query is complete and maybe to pass back the results.
In addition, we should consider calculating a hash of the query name and parameters so that we can implement query result caching - as ideally, we never want to fully process the same query twice if cached results are available.
7) Query Handlers
A query handler has to respond when a particular query is triggered and to perform whatever projections and collation functions are needed to get the data back from the given query.
The logic app underpinning the query is written in the same manner as the logic app for a command but with the added step of supplying a notification that the results are available (and the URI for those results) as a callback to the address passed in to the command instance.
This could be done by persisting the query results as a blob to an area of the Azure blob storage which in turn can be monitored and the writing of the results in turn trigger any further downstream processing as might be required.
Each query has an event stream that tracks progress in processing the query. This allows for some degree of parallelism (a project then collate pattern which is analoguos to the Map ->Reduce pattern) whereby multiple instances of the projection runners or classifier runners can run at the same time and return their results to the query event stream when they complete.
There is a projection that runs over this query event stream to decide what work still remains to be done and - if the query is complete - an Azure function which posts the result to whichever return address has been specified in the ReturnPath
property passed in.
The underlying method for this is exactly the same as with the command handler: a durable function orchestration is launched which then calls durable function activities to perform the individual steps of the query. The only additional step is that the results from running the query are themselves written to the event stream underlying that query execution. This means that should we need to get those results again at a later date we can run a projection over that event stream to get the results "as they were when asked".
Security
Azure topics can be secured by role or by individual - for a business system, it would make sense to create a number of roles corresponding to the different use cases for the application and to assign individual users to the particular roles that are applicable to them. This gives you fine grained control over the different ways that data can get into the system on a per command type basis and therefore prevent misuse.
If you need additional access control (for example, to give a given user a specific quota of a particular command), then this would need to be coded in the command handler and any "illegal" commands logged to a poison messages table for analysis.
Extensibility
One of the major advantages in building a CQRS system on top of Azure Event Grid is that it is very easy to extend - to add new sources of input or to create new queries and operations from existing event streams captured.
If we design and add a new command (as a new subscription topic), none of the existing application logic is affected and we can swap around event handlers simply by changing the routing in the event grid itself.
Adding DevOps
When a command is rejected or failed by the CQRS system we write a log record to the blob storage in a specific container named command-errors.
It is then possible to set up a logic app to monitor that folder and, whenever a new blob is written to that location it uses the visual studio connector to create a bug record for it. (There are also connectors for Git and many other defect tracking or application lifecycle management systems).
Lessons Learnt
In putting together a realistic business example of this architecture (unfortunately not public yet), I learnt the following lessons:
- Avoid putting any business logic in the trigger (instead use the command – handler architecture)
- Hook up application insights to see inside your business application
- Be wary of munging together domains – you can make an anti corruption layer in Event Grid
- Be careful with what can be parallel vs what cannot. (If things have to happen in a sequence, then have them trigger each other as a function pipeline using durable functions orchastration.)
- Only write the code that only you can write (business logic) - don't roll your own security, tracing, etc.
History
- 22nd August, 2017: First version - getting the ideas out
- 10th March, 2018: Added lessons learnt from real world example
- 19th August, 2018: Added detail to the command and query handler mechanism
- 9th December, 2018: Additional detail on how the durable functions library fits in to this framework