Introduction
MongoDB is an open-source NoSQL document-oriented database, working with JSON-like documents with dynamic schemas - as it's being described in its wikipedia page. This very document's purpose is not to provide a high level picture or a crash course in MongoDB - but in case you don't have a clue what is the buzz about this magnificent piece of software there is no better starting poing but the product site itself https://www.mongodb.com/what-is-mongodb.
This document is neither intending to serve as a C# primer in working with MongoDB, although it is going to step in a certain degree to points concerning connecting and organizing your CRUD operations in a .NET fashion.
The document is going to describe how you can build your own Resource Manager in order to incorporate a reliable transactions mechanism to your existing MongoDB data operations. I was lacking this feature, for many of my projects, and after a while I took the decision to implementing it myself. I decided that the solution should fulfill 3 core undebatable criteria :
- The solution should not be invasive. MongoDB came by design like that, for scalability and rollout issues that are out of the scope of this article. In that context any given solution should not act in a way that could violate this architectural principles of MongoDB. Hence, I would need a wrapper-like solution promoting the operations as transactions and acting more as an exoskeleton (sic) and not as an add-in.
- It has to be simple and in a paradigm that is acceptable, tested, reliable and suitable for enterprise needs. Nobody wants to learn a new custom-something-whatever-library or even worse to embed such to his production environment; especially when it comes to a critical topic as transactions and failure recovery. Custom transaction framework solutions are ruled out.
- We need to promote our data operations to transactions, not to build a transaction framework from scratch.
Given these boundary conditions, the evident answer was to utilize the powerful and extensible features that Microsoft has delivered since .NET 2.0 in System.Transactions
namespace.
Background
What is a Resource Manager
Any resource that participates in a transaction has to be managed by a resource manager, The resource manager has the responsibility to keep track of the value changes of its resources, and depending to the outcome of the transaction to allow value changes to either commit or rollback to the original ones. Microsoft SQL Server, message queues, Oracle Database, custom in-memory data structures can all of them act as resource managers.
A resource manager handles durable or volatile data. This variation refers to whether the resource manager supports failure recovery. If a resource manager supports durability, it persists data to a durable storage so in case the system experiences a failure, crash, outage etc. it can reenact the transaction upon recovery and perform all the necessary actions in order to complete the interrupted transaction. On the other hand, volatile resource managers handle their data in volatile resources such as in-memory data structures, which means in case of failure resuming the interrupted transaction is not an option anymore. The resource manager boilerplate that is going to be presented in this article focuses on handling volatile resources. Nevertheless building a resource manager which could support failure recovery, will not be far away from the closure point of this article. This draftly described sequence of actions is coordinated by a transaction manager.
What is a Transaction Manager
Transaction Manager, is that component that orchestrates all resource managers and makes sure that the transaction will hit the finishing line no matter its state; committed or rolled back. Thankfully we have nothing to develop here as Microsoft gives us a nice spectrum of choices OOB (remember decision criterium #3).
- Lightweight Transaction Manager (LTM) : is a rather performing transaction manager that uses the lightweight transaction protocol (out of the scope of this article - Google is your friend here) managing local transactions (single appdomain)
- Kernel Transaction Manager (KTM) : manages transactional Operating System resources like filesystem and registry resource managers of contemporary Windows versions (Vista and onward). Completely irrelevant to our context.
- Distributed Transaction Coordinator (DTC) : supports transactions across any execution boundary.
When we spun a new transaction from System.Transactions
, it will, always, initially handled by LTM. While our enlisted resource manager has to deal only with volatile data or with maximum one durable single phase notification resource (single domain, no hard disk interactions) the whole procedure remains under the supervision of LTM. If we have more durable resources, or at least one resource does not support single phase notification or the resource spans across the domain boundaries; then DTC automatically takes over.
How are they combining ?
In order a resource manager to be a part of transaction, it has to enlist in it. In .NET System.Transactions
namespace resides the class Transactions
which comes with a couple methods prefixed with the word "Enlist" and this is all you need to get started and be a part of a transaction. As we have said a resource manager can have two types of enlistment. Durable and volatile. So after choosing which method to use, according to our resource manager durability support we have finally enlisted our resource to be a part of a two phace commit. Additional requirement is our class, that will serve as resource manager, to implement the interface IEnlistmentNotification
. (Implementation details can be found in the next paragraph). By implementing this interface (and enlisting of course our resource manager) we make sure that the transaction manager will callback our resource manager in order to perform the necessary actions in case of commit or abort (of the transaction).
After enlistement, things get pretty straighforward. Durable or volatilve resource manager are preparing their resources information (by keeping versions/snapshots in memory or in a persistent store) and awaiting for the transaction manager to initiate the two-phase commit phase. The transaction manager will request from each enlisted resource manager to report whether they are ready to commit. The resource manager must prepare and vote back whether he intends to commit or abort the transaction. If any of the involved resource managers declare failure, the transaction manager instructs all resource manager to rollback.
Basic CRUD Operations with MongoDB
Although, as I stated already in the introduction, this is not C# primer in working with MongoDB we need to mention some staff as a foundation for later usage.
I always like to build a strong object hierarchy when I am designing a system. So every single entity-document that we are going to store in our MongoDB database, is going to derive from a single parent class containing not much information but the following :
public class MongoEntity : IEntity
{
[BsonId]
public string Id { get; set; }
}
and for every collection that we are going to have in our database we are going to create the corresponding helper class that will allow us to perform all the CRUD operations against our database (based on the Repository Pattern) :
public interface IEntityRepository<T> where T : IEntity
{
IEnumerable<T> All();
IQueryable<T> All(int page, int pageSize);
T Get(string id);
IQueryable<T> GetFunc(Expression<Func<T, bool>> expression);
T Add(T entity);
int Add(IEnumerable<T> entities);
void Remove(T entity);
bool Remove(string id);
bool RemoveAll();
int Remove(Expression<Func<T, bool>> expression);
T Update(T updatedEntity);
}
We construct then an abstract class that will be our base class for all future repositories :
public abstract class EntityRepositoryBase<T> : IEntityRepository<T> where T : IEntity
{
private MongoServer m_Server;
private MongoDatabase m_Index;
private MongoCollection<T> m_Entities;
public MongoCollection<T> Entities
{
get
{
return m_Entities;
}
}
private string m_Collection;
public EntityRepositoryBase(string collection) : this(collection, null, null)
{
}
public EntityRepositoryBase(string collection, string connectionString, string database)
{
m_Collection = collection;
m_Server = new MongoClient(connectionString).GetServer();
m_Index = m_Server.GetDatabase(database);
m_Entities = m_Index.GetCollection<T>(m_Collection);
}
public IEnumerable<T> All()
{
return this.m_Entities.AsQueryable<T>().ToList();
}
public IQueryable<T> All(int page, int pageSize)
{
}
public IEnumerable<D> AllAs<D>()
{
return m_Entities.AsQueryable<T>().OfType<D>().ToList();
}
public T Get(string id)
{
IMongoQuery query = Query.EQ("_id", id);
return this.m_Entities.Find(query).FirstOrDefault();
}
public IQueryable<T> GetFunc(System.Linq.Expressions.Expression<Func<T, bool>> expression)
{
return this.m_Entities.AsQueryable<T>().Where(expression);
}
public IQueryable<T> GetFunc(System.Linq.Expressions.Expression<Func<T, bool>> expression, int page, int pageSize)
{
return this.m_Entities.AsQueryable<T>().Where(expression).Skip((page - 1) * pageSize).Take(pageSize);
}
public IQueryable<T> GetAs<D>(System.Linq.Expressions.Expression<Func<T, bool>> expression)
{
return m_Entities.FindAllAs<D>().Cast<T>().ToList().AsQueryable().Where(expression);
}
public virtual T Add(T entity)
{
try
{
IEntity oEntity = (entity as IEntity);
oEntity.Id = String.IsNullOrEmpty(oEntity.Id) ?
ObjectId.GenerateNewId().ToString() :
oEntity.Id;
m_Entities.Insert(entity);
return entity;
}
catch (Exception mongoException)
{
if (mongoException.HResult == -2146233088)
{
throw new MongoEntityUniqueIndexException("Unique Index violation", mongoException);
}
else
{
throw mongoException;
}
}
return default(T);
}
public virtual int Add(IEnumerable<T> entities)
{
int addCount = 0;
entities.ToList().ForEach(entity =>
{
if (Add(entity) != null)
{
addCount++;
}
});
return addCount;
}
public virtual void AddBatch(IEnumerable<T> entities)
{
int addCount = 0;
entities.ToList().ForEach(entity =>
{
IEntity oEntity = (entity as IEntity);
oEntity.Id = String.IsNullOrEmpty(oEntity.Id) ?
ObjectId.GenerateNewId().ToString() :
oEntity.Id;
oEntity.Created = timeStamp;
oEntity.LastModified = timeStamp;
});
try
{
m_Entities.InsertBatch(entities);
}
catch (Exception addBatchException)
{
}
}
public virtual void Remove(T entity)
{
Remove(entity.Id);
}
public virtual bool Remove(string id)
{
try
{
IMongoQuery query = Query.EQ("_id", id);
var result = m_Entities.Remove(query);
return result.DocumentsAffected == 1;
}
catch (Exception mongoException)
{
}
return false;
}
public virtual bool RemoveAll()
{
try
{
var result = m_Entities.RemoveAll();
return result.DocumentsAffected == 1;
}
catch (Exception mongoException)
{
}
return false;
}
public virtual int Remove(System.Linq.Expressions.Expression<Func<T, bool>> expression)
{
int removeCount = 0;
List<T> entitiesToRemove = this.m_Entities.AsQueryable<T>().Where(expression).ToList();
entitiesToRemove.ForEach(entity =>
{
if (Remove((entity as IEntity).Id))
{
removeCount++;
}
});
return removeCount;
}
public virtual T Update(T updatedEntity)
{
return Update(updatedEntity);
}
}
What we have achieved so far : We have created a common way that we can initiate a connection to our MongoDB database and perform basic crucial data operations (incomplete snippet follows):
public EntityRepositoryBase(string collection) : this(collection, null, null)
public EntityRepositoryBase(string collection, string connectionString, string database)
public virtual T Add(T entity)
public virtual void Remove(T entity)
public virtual T Update(T updatedEntity)
Build Transaction support
As we mentioned before our core component that needs to be enlisted in order to participate in a transaction is the resource manager. And this has to fulfill two requirements. In the first place to be able to hold the values and their changes of the resource and to implement the IEnlistmentNotification
interface. The decision was to build a separate class to hold the values of the resources, which additionally will contain the commit and rollback functionality. This class is going to be the TransactionEntity<T>
:
public class TransactionalEntity<T> where T : IEntity
{
private T m_Original;
public T Original
{
get { return m_Original; }
}
private T m_Current;
public T Current
{
get { return m_Current; }
}
private TransactionalRepositoryBase<T> m_Repository;
public TransactionalRepositoryBase<T> Repository
{
get { return m_Repository; }
}
private bool m_CommitWithSuccess = false;
public bool CommitWithSuccess
{
get { return m_CommitWithSuccess; }
}
private bool m_RollbackWithSuccess = false;
public bool RollbackWithSuccess
{
get { return m_RollbackWithSuccess; }
}
private bool m_Prepared = false;
public bool Prepared
{
get { return m_Prepared; }
}
private EntityRepositoryCommandsEnum m_Command;
public EntityRepositoryCommandsEnum Command
{
get { return m_Command; }
}
public TransactionalEntity(T original, T current, TransactionalRepositoryBase<T> repository, EntityRepositoryCommandsEnum command)
{
m_Original = original;
m_Current = current;
m_Repository = repository;
m_Command = command;
}
public bool Commit()
{
m_CommitWithSuccess = true;
return m_CommitWithSuccess;
}
public bool Rollback()
{
if (m_Command == EntityRepositoryCommandsEnum.Update)
{
m_Repository.NonTxUpdate(this.m_Original);
}
if (m_Command == EntityRepositoryCommandsEnum.Add)
{
m_Repository.NonTxRemove(this.m_Current);
}
if (m_Command == EntityRepositoryCommandsEnum.Remove)
{
m_Repository.NonTxAdd(this.m_Original);
}
m_RollbackWithSuccess = true;
return m_RollbackWithSuccess;
}
public T Add()
{
T result = m_Repository.NonTxAdd(this.m_Current);
m_Prepared = true;
return result;
}
public void Remove()
{
m_Repository.NonTxRemove(this.Original);
m_Prepared = true;
}
public T Update()
{
T result = m_Repository.NonTxUpdate(this.m_Current);
m_Prepared = true;
return result;
}
}
TransactionEntity<T>
will be a generic class, taking as a parameter an IEntity
. It will hold the Current
and Original
values in the respective properties and it will be aware of the repository class that it has to use in order to perform operations against the database. Additionally via the property command, it will be self-aware which command to perform and accodingly to decide with compensation command to execute in case of failure :
public T Original
public T Current
public TransactionalEntity(T original, T current, TransactionalRepositoryBase<T> repository, EntityRepositoryCommandsEnum command)
public EntityRepositoryCommandsEnum Command
The commands are limited and are part of an enumeration :
public enum EntityRepositoryCommandsEnum
{
Add,
Remove,
Update
}
Additionally holds 5 important methods. Commit
and Rollback
, which will be executed from the resource manager on behalf of the resource (after all votes are cast and transaction manager has decided for the outcome of this transaction). Depending on the requested command against the database, Rollback
method decides what would be the compensation countermeasures.
public bool Commit()
{
m_CommitWithSuccess = true;
return m_CommitWithSuccess;
}
public bool Rollback()
{
if (m_Command == EntityRepositoryCommandsEnum.Update)
{
m_Repository.NonTxUpdate(this.m_Original);
}
if (m_Command == EntityRepositoryCommandsEnum.Add)
{
m_Repository.NonTxRemove(this.m_Current);
}
if (m_Command == EntityRepositoryCommandsEnum.Remove)
{
m_Repository.NonTxAdd(this.m_Original);
}
m_RollbackWithSuccess = true;
return m_RollbackWithSuccess;
}
These remaining 3 methods, are Add
, Update and Remove
which are doing the actual work issuing single phase commits against the database (and this will be our prepare actions). They are calling some delegates of the associated repository class which we are going to discuss in later stage of the article.
public T Add()
{
T result = m_Repository.NonTxAdd(this.m_Current);
m_Prepared = true;
return result;
}
public void Remove()
{
m_Repository.NonTxRemove(this.Original);
m_Prepared = true;
}
public T Update()
{
T result = m_Repository.NonTxUpdate(this.m_Current);
m_Prepared = true;
return result;
}
Next class in the picture is finally the long awaited resource manager. Of course is a generic class, as all the rest in our family tree, taking an IEntity
as a parameter. Methods Commit
, InDoubt
, Rollback
and Prepare
are the actual callbacks for the transaction manager. It is utilizing TransactionalEntity<T>
to prepare, commit, rollback the data and to notify his decision concerning his vote about the outcome of the transaction.
public class MongoResourceManager<T> : IEnlistmentNotification where T : IEntity
{
private TransactionalEntity<T> m_TxEntity;
public MongoResourceManager(TransactionalEntity<T> txEntity)
{
m_TxEntity = txEntity;
}
public MongoResourceManager(T entity, TransactionalRepositoryBase<T> repository, EntityRepositoryCommandsEnum command)
{
T current = entity;
T original = repository.Get(entity.Id);
TransactionalEntity<T> txEntity = new TransactionalEntity<T>(original, current, repository, command);
m_TxEntity = txEntity;
}
public void Commit(Enlistment enlistment)
{
bool success = this.m_TxEntity.Commit();
if (success)
{
enlistment.Done();
}
}
public void InDoubt(Enlistment enlistment)
{
Rollback(enlistment);
}
public void Prepare(PreparingEnlistment preparingEnlistment)
{
if (this.m_TxEntity.Prepared)
{
preparingEnlistment.Prepared();
}
}
public void Rollback(Enlistment enlistment)
{
bool success = this.m_TxEntity.Rollback();
if (success)
{
enlistment.Done();
}
}
}
In order to achieve our goal we have to create a new subclass of the repository class. This new subclass TransactionalRepositoryBase<T>
will be able to identify whether the requested command was issued in the context of a transaction (within a TransactionScope
to be accurate) and either create a new MongoResourceManager<T>
and enlist him or execute a normal operation against the database :
public abstract class TransactionalRepositoryBase<T> : EntityRepositoryBase<T> where T : IEntity
{
internal delegate T AddEntityHandler(T entity);
internal delegate void RemoveEntityHandler(T entity);
internal delegate T UpdateEntityHandler(T entity);
internal AddEntityHandler NonTxAdd;
internal RemoveEntityHandler NonTxRemove;
internal UpdateEntityHandler NonTxUpdate;
public TransactionalRepositoryBase(string collection) : this(collection, null, null)
{
}
public TransactionalRepositoryBase(string collection, string connectionString, string database) : base(collection, connectionString, database)
{
NonTxAdd = new AddEntityHandler(base.Add);
NonTxRemove = new RemoveEntityHandler(base.Remove);
NonTxUpdate = new UpdateEntityHandler(base.Update);
}
public override T Add(T entity)
{
if (Transaction.Current != null)
{
TransactionalEntity<T> txEntity = new TransactionalEntity<T>(default(T), entity, this, EntityRepositoryCommandsEnum.Add);
MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);
Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
return txEntity.Add();
}
else
{
return NonTxAdd(entity);
}
}
public override void Remove(T entity)
{
if (Transaction.Current != null)
{
TransactionalEntity<T> txEntity = new TransactionalEntity<T>(entity, default(T), this, EntityRepositoryCommandsEnum.Remove);
MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);
Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
txEntity.Remove();
}
else
{
NonTxRemove(entity);
}
}
public override T Update(T entity)
{
if (Transaction.Current != null)
{
T original = this.Get(entity.Id);
TransactionalEntity<T> txEntity = new TransactionalEntity<T>(original, entity, this, EntityRepositoryCommandsEnum.Remove);
MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);
Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
return txEntity.Update();
}
else
{
return NonTxUpdate(entity);
}
}
}
Key point to achieve this is to override the Add
, Update
and Remove
methods of the original repository abstract class in the following way. By checking the Transaction.Current
property we can identify if we are in the context of a TransactionScope
. We create our TransactionalEntity<T>
and its MongoResourceManager<T>
and finally we enlist it as volatile and we execute the method(Add
, Remove
or Update
) provided from TransactionalEntity<T>
class. Otherwise we execute one of the delegate we discussed before.
public override T Add(T entity)
{
if (Transaction.Current != null)
{
TransactionalEntity<T> txEntity = new TransactionalEntity<T>(default(T), entity, this, EntityRepositoryCommandsEnum.Add);
MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);
Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
return txEntity.Add();
}
else
{
return NonTxAdd(entity);
}
}
Those delegates are pointing to the Add
, Remove
and Update
methods of the base class (EntityRepositoryBase
) and will execute the command against the database in a single phase commit. They are going to be reused from TransactionalRepositoryBase<T>
and TransactionalEntity<T>
.
internal delegate T AddEntityHandler(T entity);
internal delegate void RemoveEntityHandler(T entity);
internal delegate T UpdateEntityHandler(T entity);
internal AddEntityHandler NonTxAdd;
internal RemoveEntityHandler NonTxRemove;
internal UpdateEntityHandler NonTxUpdate;
public TransactionalRepositoryBase(string collection, string connectionString, string database) : base(collection, connectionString, database)
{
NonTxAdd = new AddEntityHandler(base.Add);
NonTxRemove = new RemoveEntityHandler(base.Remove);
NonTxUpdate = new UpdateEntityHandler(base.Update);
}
Example and Usage
Lets create a new entity and a new repository class :
public class TestDocument : MongoEntity
{
public string DocumentId { get; set; }
}
public class TestDocumentsRepository : TransactionalRepositoryBase<TestDocument>
{
public TestDocumentsRepository()
: base("test_documents", "mongodb://localhost:27017", "tx_tests")
{
}
}
and then lets create a situation where we could emulate randomly successful and aborted transactions. Every time the repository.Add(document)
method is called a new MongoResourceManager<T>
is created and enlisted in the transaction created for our TransactionScope
scope. If the code manages to reach until scope.Complete()
then the transaction successfully commits otherwise it automatically rollbacks and removes all the data from the collection.
private void ExecuteInTx(object sender, EventArgs e)
{
TestDocumentsRepository repository = new TestDocumentsRepository();
repository.RemoveAll();
using (TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew))
{
try
{
for (int docIdx = 0; docIdx < 5; docIdx++)
{
int poison = random.Next(1000);
if (poison != 0 && poison % 200 == 0)
{
throw new Exception("Poison killed my TX");
}
TestDocument document = new TestDocument();
document.DocumentId = Guid.NewGuid().ToString();
repository.Add(document);
Thread.Sleep(100);
}
scope.Complete();
}
catch (Exception)
{
}
}
}
Points of Interest
To monitor the activity of the LTM or DTC you have to do nothing but to open the Component Services MMC of your Windows :
The code presented in this article is by no means a copy-paste specimen for your production needs. It is a guide and baseline how could you incorporate Transactions to MongoDB database in a .NET generic way.
History
-