Introduction
In this article, we will look at Inventory Manager Application with focus on implementation of Aggregates and Event sourcing. Use cases for Inventory Manager are based on “Super simple cqrs” example by Greg Young. In Inventory Manager, only the first use case of creating an inventory item has been implemented.
Code for Inventory Manager Application is on GitHub Repository - Located Here
Note that this post is part of series of articles, links to all the articles in the series are provided below:
- Introduction
- Need for Enterprise Servicebus Frameworks and Decoupled messaging with their samples
- Inventory Manager - CQRS application (with decoupled messaging) - Commands
- Inventory Manager - CQRS application (with decoupled messaging) - Aggregate and Event Sourcing (this article)
- Inventory Manager - CQRS application (with decoupled messaging) - ReadSide
InventoryManager – Aggregate and Event Sourcing
- As mentioned in videos by Greg Young, Aggregate and Read model DTOs have structural representation and they capture only the final state of the system. However, the source of truth are the events.
- Aggregate exposes behavior for use cases to consume. This behavior fires off events which are the source of truth. Thus, we persist these events (behavioral model) and derive state (structural model) from events by providing handlers for those events.
- Thus, aggregate has handlers for its events.
public class InventoryItem : EventSourced
{
private string _name = string.Empty;
protected InventoryItem(Guid id)
: base(id)
{
Handles<InventoryItemCreated>(OnInventoryItemCreated);
}
private void OnInventoryItemCreated(InventoryItemCreated e)
{
this._name = e.Name;
}
public InventoryItem(Guid id, string name):this(id)
{
Update(new InventoryItemCreated(id, name));
}
}
Azure Event Sourced Repository
Ensuring Publishing of Events Upon Save
EventStore
stores events in Azure Table Storage. - In
AzureEventSourcedRepository
, we need to ensure that upon saving of the events, they are published also.
public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T>
where T : class, IEventSourced
{
public void Save(T eventSourced, string correlationId)
{
var events = eventSourced.Events.ToArray();
var serialized = events.Select
(e => _versionedEventSerializer.Serialize(e, typeof (T), correlationId));
_eventStore.Save(eventSourced.Id.ToString(), serialized);
_publisher.Send(eventSourced.Id.ToString(), events.Length);
}
}
- However, it’s not possible to have a Distributed transaction across Azure Table and Azure Service Bus. Azure table storage supports transaction for a unique partition key only. We store Aggregate’s identity in that partition key column. Thus, in order to ensure that events are published, we save 2 copies of events in the event store.
- One copy is the permanent record of that event, and the other copy becomes part of a virtual queue of events that must be published on the Windows Azure Service Bus. The following code sample shows the
Save
method in the EventStore
class. The prefix “Unpublished” identifies the copy of the event that is part of the virtual queue of unpublished events.
public class EventStore<T> : IEventStore<T>, IPendingEventsQueue<T>
{
public void Save(string sourceId, IEnumerable<EventData> events)
{
var table = _tableClient.GetTableReference(_tableName);
var tableBatchOperation = new TableBatchOperation();
foreach (var eventData in events)
{
if (eventData.SourceId != sourceId)
throw new Exception("Events from different aggregate instances found during EventStore
save. Events from only single Aggregate instance can be saved.");
var creationDate = DateTime.UtcNow;
tableBatchOperation.Insert(eventData.ToAzureTableEntry(creationDate));
tableBatchOperation.Insert(eventData.ToUnpublishedAzureTableEntry(creationDate));
}
try
{
table.ExecuteBatch(tableBatchOperation);
}
catch (DataServiceRequestException ex)
{
var inner = ex.InnerException as DataServiceClientException;
if (inner != null && inner.StatusCode == (int)HttpStatusCode.Conflict)
{
throw new ConcurrencyException();
}
throw;
}
}
}
- These 2 records are for the same aggregate and thus have the same partitionkey. Hence, they will be saved in a single transaction.
- Thus, the
EventStore
class implements both interfaces IEventStore
and IPendingEventsQueue
. - The
AzureEventSourced
Repository calls save on the EventStore
and then calls upon the publisher
to publish events for the aggregate instance. - The publisher reads the unpublished events records for the specified aggregate and publishes them via
IServicebus
. After publishing, the publisher deletes the records for unpublished events.
public class EventStoreBusPublisher<T> : IEventStoreBusPublisher<T>, IDisposable
{
private void SendAndDeletePending(EventData eventData)
{
var ev = new VersionedEventSerializer().Deserialize(eventData);
_sender.Publish(ev);
_queue.DeletePending(eventData);
}
}
EventPublishProcessor
- There is the possibility of a crash happening after saving of the 2 copies of events but prior to publishing of events in save of the
AzureEventSourcedRepository
. Since those 2 statements are not wrapped in any (Distributed) Transaction, the system would go into an inconsistent state if such a crash were to happen.
public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T> where T : class, IEventSourced
{
public void Save(T eventSourced, string correlationId)
{
var events = eventSourced.Events.ToArray();
var serialized = events.Select(e => _versionedEventSerializer.Serialize
(e, typeof (T), correlationId));
_eventStore.Save(eventSourced.Id.ToString(), serialized);
_publisher.Send(eventSourced.Id.ToString(), events.Length);
}
}
- Thus, in the case of a failure, the system must include a mechanism for scanning all of the partitions in table storage for aggregates with unpublished events and then publishing those events. This process will take some time to run, but will only need to run when the application restarts.
EventPublishProcessor
component is registered such that it runs at startup in worker role for the above purpose. It takes dependency on the IEventStoreBusPublisher
. EventPublishProcessor
is registered as a IProcessor
in BootStrapper
.
namespace InventoryManager.Worker.Main
{
public class Bootstrapper
{
private static void RegisterRepository<T>(CloudStorageAccount storageAccount,
string eventStoreTableName) where T: class, IEventSourced
{
IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>>(name: typeof(T).Name +
"_EventPublisherProcessor");
}
}
}
InventoryManagerProcessor
- In the worker role,
InventoryManagerProcessor
retrieves all the registered IProcessor
s registered via IoC
and then starts them up.
class InventoryManagerProcessor : IDisposable
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly List<IProcessor> _processors;
public InventoryManagerProcessor()
{
_cancellationTokenSource = new CancellationTokenSource();
_processors = IoC.ResolveAll<IProcessor>().ToList();
}
public void Start()
{
_processors.ForEach(p => p.Start());
}
public void Stop()
{
_cancellationTokenSource.Cancel();
_processors.ForEach(p => p.Stop());
}
}
- The
InventoryManagerProcessor
is called up during start of the workerRole
.
public class WorkerRole : RoleEntryPoint
{
private async Task RunAsync(CancellationToken cancellationToken)
{
using (var processor = new InventoryManagerProcessor())
{
processor.Start();
while (!cancellationToken.IsCancellationRequested)
{
Thread.Sleep(10000);
}
processor.Stop();
return;
}
}
}
Registering EventSourcedRepository
EventSourced
repository has several dependencies. - The method to register a repository has thus been templated out as shown below:
namespace InventoryManager.Worker.Main
{
public class Bootstrapper
{
private static void RegisterRepository<T>(CloudStorageAccount storageAccount,
string eventStoreTableName) where T: class, IEventSourced
{
var eventStore = new EventStore<T>(storageAccount, eventStoreTableName);
IoC.RegisterInstance<IEventStore<T>>(eventStore);
IoC.RegisterInstance<IPendingEventsQueue<T>>(eventStore);
IoC.RegisterAsSingleton<IEventStoreBusPublisher<T>, EventStoreBusPublisher<T>>();
IoC.RegisterAsSingleton<IEventSourcedRepository<T>, AzureEventSourcedRepository<T>>();
IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>>
(name: typeof(T).Name + "_EventPublisherProcessor");
}
}
}
- This simplifies creating more repositories for future aggregates.
Registering Handlers for Commands and Events
- This is similar to the sample application and is done during the bootstrapping of worker role.
namespace InventoryManager.Worker.Main
{
public class Bootstrapper
{
private static void RegisterServiceBus()
{
var bus = new MassTransitServiceBus(
x => new MassTransitWithAzureServiceBusConfigurator
(ConfigurationManager.AppSettings.Get("azure-namespace"),
"InventoryManager.WriteSide",
ConfigurationManager.AppSettings.Get("azure-key"), x)
.WithHandler<CreateInventoryItem, InventoryItemAppService>()
.WithHandler<InventoryItemCreated, InventoryViewModelGenerator>());
;
IoC.RegisterInstance<IServiceBus>(bus);
}
}
}
- Note how the event “
InventoryItemCreated
” is being subscribed in the ViewModelGenerator
. The ViewModelGenerator
is part of the worker role and responsible for updating the ReadModel
based on the events that were generated.
Next Article in the Series
The next article in the series will focus on Read side in Inventory Manager application.
For a complete list of articles in this series, please go to the Introduction section of this article.
Thanks for reading the articles, hope they are proving insightful.
References
- “Super simple cqrs” example by Greg Young
- Greg Young video on CQRS (about 6 hours)