This article demonstrates another way of using proxies and interceptors from the Castle Dynamic Proxy framework. We implemented an interruptible and restartable flow, that is really lightweight, it is debuggable, and the flow logic is in the C# method. Also, we used Moq for unit testing and proved that for verifying and debugging your code, you don't need to spend time creating and running an application.
You can find the first article about Dynamic Proxies at the link below:
Introduction
Hello there, welcome to another story about proxies and interceptors.
Quote:
I have to thank Larry Ross for pointing out Aspect-Oriented Programming to the Pro Coders team.
If you were not amazed by my previous blog post where we discussed the model change tracking and rule execution, I will try to impress you this second time. Let's assume you have a magic technology to interrupt a .NET method, save its state, and restart from the saved point later, or maybe on another machine. You maybe think that you can do that using the Windows hibernate function, save the Windows image to a large file, copy it, and restore it on another virtual machine. But no! I am talking about a small state of about one hundred bytes, that can be saved to a database, read by another machine, and restarted from the saved point.
Are you intrigued?
Let's write a user story then.
User Story #4: Create an Interruptible and Restartable in Another AppDomain Flow
- Logic and sequence of the flow steps should be in a C# method
- Each step should have the ability to interrupt the flow execution
- The state of the interrupted flow and flow code should be enough to restart the flow in another Application Domain
Implementation - Generic Flow
To start, I created a new .NET Core 3.1 project DemoCatleProxy.RestartableFlow
and added Castle.Core
NuGet package.
Architecturally, I think about this problem in the way that we may have many different flows and an engine that will be used to execute any one of these flows and restart them from the saved point when needed. If we think about it in terms of the business document circulation - each type of document will have a unique flow, and it will contain different steps like submission, review, approval, second approval, etc. and in different sequences of these steps.
Now I would like to define a template for flows that we are going to implement. Firstly, we need an interface that will define the flow, which we will use in the flow engine to reference the flows:
public interface IFlow
{
object UntypedModel { get; }
void Execute();
internal void SetModel(object model);
}
The Execute
method is a container of the flow logic and the rest declaration is for setting up our Model.
You can think of Model as a Document itself (in circulation) plus all related supporting documentation - signatures, dates of signatures, current status, next address for delivery, etc.
Each new flow we execute should have its own Model. So, my next interface is a generic template with the Model argument:
public interface IFlow<M> : IFlow
where M : class
{
M Model { get; }
}
Thus, each time you define a new flow for a new Document - you supply a Model argument, for example:
public class MyFlow : IFlow<MyModel>
Now to simplify future flow definitions, we create a base class that implements basic functions:
public abstract class Flow<M> : IFlow<M>
where M : class, new()
{
protected M _model;
public M Model => _model;
public object UntypedModel => _model;
public Flow()
{
_model = new M();
}
void IFlow.SetModel(object model)
{
_model = model as M;
}
public abstract void Execute();
}
The base class allows working with the typed Model
and it creates an instance of the Model
in the constructor.
Eventually, we can try to define our first flow (and model) and see how it works, and I would like to do it in a unit test as usual. I created a new xUnit project DemoCatleProxy.RestartableFlow.Tests
and added a project reference to the main project. Now let's add flow code:
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
namespace DemoCatleProxy.RestartableFlow.Tests
{
public class Model1
{
public string ReceivedMessage { get; set; }
public string Signature { get; set; }
public bool IsSubmitted { get; set; }
public bool IsLoaded { get; set; }
}
public interface IDemoDataService
{
string LoadReceivedMessage();
bool IsMessageApproved(string message);
string GetSignature(string message);
bool Submit(string message, string signature);
}
public class DemoFlow1 : Flow<Model1>
{
private readonly IDemoDataService _dataService;
public DemoFlow1()
{
}
public DemoFlow1(IDemoDataService dataService)
{
_dataService = dataService;
}
public override void Execute()
{
LoadData();
CheckIfApproved();
AddDigitalSignature();
SubmitData();
}
public virtual void LoadData()
{
if (Model.IsLoaded)
{
throw new FlowFatalTerminateException();
}
Model.ReceivedMessage = _dataService.LoadReceivedMessage();
Model.IsLoaded = true;
}
public virtual void CheckIfApproved()
{
if (!_dataService.IsMessageApproved(Model.ReceivedMessage))
{
throw new FlowStopException();
}
}
public virtual void AddDigitalSignature()
{
Model.Signature = _dataService.GetSignature(Model.ReceivedMessage);
}
public virtual void SubmitData()
{
if (!_dataService.Submit(Model.ReceivedMessage, Model.Signature))
{
throw new FlowStopException();
}
}
}
}
I defined Model1
and DemoFlow1
, and also for better realism, I added the IDemoDataService
contract (interface) that will be used by the flow to communicate with the outside world.
If you look at the Execute
method, you will see it is a sequence of four methods, each of them ([LoadData
, CheckIfApproved
, AddDigitalSignature
, and SubmitData
) must be virtual and I will explain why.
We will use proxy interceptors to intercept the execution of each of the used methods. When we intercept a call to the proxy object, we can make a decision of whether we should allow the call or skip it. If we restart the flow, we should be able to skip all calls until we come to the point of the previous stop. So, the methods we call from Execute
must be virtual.
If you have a look at LoadData
, it checks that the Model
has not been loaded before, to make sure that we start the flow with a clean and fresh Model
. There is an extra check for us: the method throws FlowFatalTerminateException
(we will define exceptions a bit later) if the Model
signals that it is loaded already - this flow will be marked as corrupted, but if the Model
has not been loaded yet, we read the message from a data service and set the Model
IsLoaded
flag.
The next step in the sequence is the CheckIfApproved
method. It asks a data service if the message is approved and if not - it throws another exception FlowStopException
or continues execution. If the flow is stopped (by the FlowStopException
]), the flow engine will return status IsStopped
and this flow can be restarted later.
Important: When I talk about restartable flow, I mean an instance that we created once and run once can be finished successfully, or it can return status [IsStopped], then we can save this stopped instance in a database and try to restart it later.
We use exceptions in the flows because this is the most effective way of stopping execution at a particular point and the exception will be bubbled up in the call stack until somebody catches it.
Let's add the exceptions code to the main project:
using System;
using System.Collections.Generic;
using System.Text;
namespace DemoCatleProxy.RestartableFlow
{
public class FlowStopException : Exception
{
}
public class FlowFatalTerminateException : Exception
{
}
}
We will use FlowStopException
to inform the flow engine that flow stopped normally and can be restarted later, and we will use FlowFatalTerminateException
to inform the engine that flow is corrupted.
Implementation - Flow Engine
The logic of the first flow run is the following ([DemoFlow1
for example):
- We create a new instance of
DemoFlow1
class (Model
is created automatically by Flow
base class from which we inherited) and supply it to the flow engine. - The flow engine creates a flow proxy object of the
DemoFlow1
instance and a flow data - that is a full history of all changes that happened within the DemoFlow1
instance. - Then the flow engine executes the flow proxy
Execute
method, which calls LoadData
, CheckIfApproved
, AddDigitalSignature
, SubmitData
one by one. - Each call to the mentioned methods is intercepted and we save to flow data every time the method is called, and a copy of the
Model
obtained after the called method. - If any of the called methods throws an exception, the
Execute
method is interrupted and the flow engine returns flow data. - If the
Execute
method is finished without any interruption, the flow engine returns flow data with status IsFinished = true
.
The logic to restart the flow is:
- We create a new instance of
DemoFlow1
and, having flow data from the previous run, we supply the flow instance and flow data to the flow engine. - The flow engine creates a flow proxy object of the
DemoFlow1
instance and uses supplied flow data. - Then the flow engine executes the flow proxy
Execute
method, which calls LoadData
, CheckIfApproved
, AddDigitalSignature
, SubmitData
one by one. - Each call to the mentioned methods is intercepted and we check if the called method is already recorded in the flow data history. If yes, then we skip this call and substitute the current
DemoFlow1
instance Model by the model from the flow data history, to make it the same as it was after calling of this method on the first run. - If the call is not found in the flow data history, we proceed with the call and store it and the resulting
Model
to the history, continue the flow execution the same way as we did on the first run.
For the flow engine, we will need an interface:
using System;
using System.Collections.Generic;
using System.Text;
namespace DemoCatleProxy.RestartableFlow
{
public interface IFlowEngine
{
FlowData RunFlow(IFlow flow);
FlowData RestartFlow(IFlow flow, FlowData flowData);
}
}
and flow data class:
using System;
using System.Collections.Generic;
using System.Text;
namespace DemoCatleProxy.RestartableFlow
{
public class FlowData
{
public bool IsFinished { get; set; }
public List<string> CallHistory { get; set; } = new List<string>();
public List<object> ModelHistory { get; set; } = new List<object>();
public bool IsStopped { get; set; }
public Exception LastException { get; set; }
}
}
Now we can present the flow engine:
using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Text;
using System.Xml.Schema;
namespace DemoCatleProxy.RestartableFlow
{
public class FlowEngine : IFlowEngine, IInterceptor
{
private readonly IProxyGenerator _proxyGenerator;
private FlowData _flowData;
private IFlow _flow;
private int _counter;
public FlowEngine(IProxyGenerator proxyGenerator)
{
_proxyGenerator = proxyGenerator;
}
public FlowData RunFlow(IFlow flow)
{
_flowData = new FlowData();
return ProcessFlow(flow);
}
public FlowData RestartFlow(IFlow flow, FlowData flowData)
{
_flowData = flowData;
return ProcessFlow(flow);
}
private FlowData ProcessFlow(IFlow flow)
{
var options = new ProxyGenerationOptions(new FreezableProxyGenerationHook(flow));
var flowProxy = _proxyGenerator.CreateClassProxyWithTarget(flow.GetType(),
flow, options, new IInterceptor[] { this }) as IFlow;
_flow = flow;
try
{
_counter = 0;
_flowData.IsStopped = false;
_flowData.LastException = null;
flowProxy.Execute();
_flowData.IsFinished = true;
}
catch (FlowStopException e)
{
_flowData.IsStopped = true;
}
catch (Exception e)
{
_flowData.LastException = e;
}
return _flowData;
}
public void Intercept(IInvocation invocation)
{
var method = invocation.Method.Name;
_counter++;
var historyRecord = $"{_counter}:{method}";
var index = _flowData.CallHistory.IndexOf(historyRecord);
if (index == -1)
{
invocation.Proceed();
_flowData.CallHistory.Add(historyRecord);
_flowData.ModelHistory.Add(_flow.UntypedModel.CloneObject());
}
else
{
_flow.SetModel(_flowData.ModelHistory[index]);
}
}
}
}
As you can see, the FlowEngine
class keeps an internal state in private
fields, so it cannot be declared as a singleton, we will need to create a new instance of the engine each time we need it and it cannot be shared between several threads.
If you go through the engine implementation, you will see that it does all that we described above.
The flow engine uses CloneObject
extension to create a new instance of the Model
object and to copy all its properties. Please add Newtonsoft.Json
NuGet package and this code:
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
namespace DemoCatleProxy.RestartableFlow
{
public static class ObjectExtension
{
public static T CloneObject<T>(this T source)
{
var jsonSerializerSettings = new JsonSerializerSettings
{
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple,
TypeNameHandling = TypeNameHandling.Objects
};
var json = JsonConvert.SerializeObject(source, jsonSerializerSettings);
var result = JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
return result;
}
}
}
Maybe it is not the most effective way, but I didn't want to overcomplicate this article.
The last bit to be added now is a hook, which is required to be supplied to the proxy generation method options. It sets up the behavior when all methods that are called from other methods ([Execute
calls LoadData
]) will be executed on the proxy level (not on the original object level) so we will intercept all calls. I took this from the Castle Dynamic Proxy Tutorial. The hook code is:
using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
namespace DemoCatleProxy.RestartableFlow
{
public class FreezableProxyGenerationHook : IProxyGenerationHook
{
private IFlow _flow;
public FreezableProxyGenerationHook(IFlow flow)
{
_flow = flow;
}
public override int GetHashCode()
{
return _flow.GetHashCode();
}
public override bool Equals(object obj)
{
return _flow == (obj as FreezableProxyGenerationHook)._flow;
}
public bool ShouldInterceptMethod(Type type, MethodInfo memberInfo)
{
return memberInfo.Name != "Execute" && memberInfo.Name != "SetModel";
}
public void NonVirtualMemberNotification(Type type, MemberInfo memberInfo)
{
}
public void MethodsInspected()
{
}
public void NonProxyableMemberNotification(Type type, MemberInfo memberInfo)
{
}
}
}
Now we can compile our code and start testing it.
Implementation - Unit Test
Let's implement a unit test that will verify a simple scenario:
- We create a flow and run it the first time, and the demo service
IsMessageApproved
method returns False
, so the flow should be interrupted on the CheckIfApproved
method. - Keeping flow data after the first run, we try to restart the flow, but at this time
IsMessageApproved
returns True
, so we expect the flow to be finished successfully.
I added code to the DemoFlowTests.cs file:
using Castle.DynamicProxy;
using System;
using Xunit;
using Moq;
namespace DemoCatleProxy.RestartableFlow.Tests
{
public class DemoFlowTests
{
[Fact]
public void RunStopRestartFlowTest()
{
var flowEngine = new FlowEngine(new ProxyGenerator());
var demoService = new Mock<IDemoDataService>();
var flow = new DemoFlow1(demoService.Object);
int approveTimes = 0;
demoService.Setup(s => s.LoadReceivedMessage()).Returns("Important message 1");
demoService.Setup(s => s.GetSignature(It.IsAny<string>())).Returns("0xAABBEFA7");
demoService.Setup(s => s.Submit(It.IsAny<string>(),
It.IsAny<string>())).Returns(true);
demoService.Setup(s => s.IsMessageApproved(It.IsAny<string>()))
.Returns(() =>
{
approveTimes++;
return approveTimes == 2;
});
var flowData = flowEngine.RunFlow(flow);
Assert.True(flowData.IsStopped);
Assert.False(flowData.IsFinished);
Assert.Single(flowData.ModelHistory);
Assert.True((flowData.ModelHistory[0] as Model1)?.IsLoaded);
var clonedFlowData = flowData.CloneObject();
var newFlow = new DemoFlow1(demoService.Object);
clonedFlowData = flowEngine.RestartFlow(newFlow, clonedFlowData);
Assert.False(clonedFlowData.IsStopped);
Assert.True(clonedFlowData.IsFinished);
}
}
}
When you use Dependency Injection, you can save a lot of your time and effort in unit testing. I used the Moq
framework (please install Moq NuGet package) to generate a demo data service by interface IDemoDataService
. The real demo data service has not been implemented yet - we don't need it for testing.
I set up all four methods, and three of them return constant values, but IsMessageApproved
will return False
the first time and True
the next time.
My test creates and runs a flow the first time and checks the resulted flow data, it should have IsStopped = True
and IsFinished = False
. The flow data contains a state of the interrupted flow. This state is everything that we need in order to restart the flow.
We cloned flow data (as you remember, I serialize it to JSON and deserialize it back) to demonstrate that flow data is transferable and doesn't depend on the AppDomain
.
And eventually, I call RestartFlow
. Now the test expects that IsStopped = False
and IsFinished = True
.
If you put a breakpoint on the line with the RestartFlow
call and use Step Into
until you come to the Intercept
method, you can see how the flow engine skipping logic works:
I recommend playing with the debugger to see how the flow is executed the first time and how it is restarted to finish.
Summary
This article demonstrates another way of using proxies and interceptors from the Castle Dynamic Proxy framework. We implemented an interruptible and restartable flow, that conceptually is similar to the Microsoft Workflow Foundation flows but it is really lightweight, it is debuggable, and the flow logic is in the C# method. Also, we used Moq for unit testing and we have, once again, proven that for verifying and debugging your code, you don't need to spend time creating and running an application.
Save time with unit testing, and thank you for reading.
P.S.
I got some feedback on this blog from my friends and actually, they were interested in why the simple straightforward flow containing a sequence of four methods is implemented in this complex way?
The thing is that the flow example in this blog is simple for a better understanding. In the real-life scenario, your flow will contain if
conditions, goto
and maybe loops and the approach will continue working even with the complex flows. This is an example flow from one of Pro Coders team projects:
public override async Task Execute()
{
await BeginAsync();
await PopulateData();
await FlowTask(typeof(InsuranceClaimValidationRulesTask));
dataEntry:
if (TaskExecutionValidationIssues.Any())
{
await UserInput(typeof(InsuranceClaimEntryForm));
}
if (Model.Claim.ClaimAmount.Amount > 1000)
{
await UserInputReview(typeof(InsuranceClaimEntryForm));
if (Model.ClaimRejected)
{
await SaveRejectedClaim();
await GenerateRejectLetter();
goto postDocuments;
}
if (TaskExecutionValidationIssues.Any())
{
goto dataEntry;
}
}
if (Model.Claim.PaymentOptionCode == "EFT")
{
await MakeTransfer();
}
else if (Model.Claim.PaymentOptionCode == "CHQ")
{
await PrintBankCheque();
}
else if (Model.Claim.PaymentOptionCode == "FUT_CONTR")
{
await BuyOptions();
}
else
{
Fail($"Invalid Payment Option {Model.Claim.PaymentOptionCode}", _logStreamer);
}
await SaveClaim();
await GenerateSuccessLetter();
postDocuments:
await PostProducedDocuments();
await EndAsync();
}
As you can see this flow engine also supports asynchronous operations async-await
and users input in dynamic forms, but the principle is the same - the flow can be stopped or even fail at some point and restarted later in a day or in a month and on another machine.
History
- 31st October, 2020: Initial version